aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From zma...@apache.org
Subject [2/2] aurora git commit: Change job updates to rely on `health-checks` rather than on `watch_secs`.
Date Thu, 17 Nov 2016 21:59:52 GMT
Change job updates to rely on `health-checks` rather than on `watch_secs`.

Make RUNNING a first class state to indicate that the task is running
and is healthy. It is achieved by introducing a new configuration
parameter `min_consecutive_successes`, which will dictate when to move
a task into RUNNING state.

With this change, it is possible to set the `watch_secs` to 0, so that
updates are purely based on the task's health, rather than relying on
watching the task to in RUNNING state for a pre-determined timeout.

Testing Done:
buils-support/jenkins/build.sh
sh ./src/test/sh/org/apache/aurora/e2e/test_end_to_end.sh

Bugs closed: AURORA-1225

Reviewed at https://reviews.apache.org/r/53590/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/2992c8b4
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/2992c8b4
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/2992c8b4

Branch: refs/heads/master
Commit: 2992c8b4dec3294293b2130c49a8836e070bceae
Parents: 05f082a
Author: Santhosh Kumar Shanmugham <santhoshkumar.s@gmail.com>
Authored: Thu Nov 17 13:59:35 2016 -0800
Committer: Zameer Manji <zmanji@apache.org>
Committed: Thu Nov 17 13:59:35 2016 -0800

----------------------------------------------------------------------
 RELEASE-NOTES.md                                |   4 +
 docs/development/design-documents.md            |   2 +-
 docs/features/job-updates.md                    |  13 +-
 docs/features/services.md                       |  17 +
 docs/reference/configuration.md                 |   3 +-
 docs/reference/task-lifecycle.md                |   4 +-
 .../apache/aurora/client/api/updater_util.py    |   4 +-
 src/main/python/apache/aurora/client/config.py  |  23 +-
 .../python/apache/aurora/config/schema/base.py  |  11 +-
 .../apache/aurora/executor/aurora_executor.py   |  11 +-
 .../aurora/executor/common/health_checker.py    | 147 +++-
 .../aurora/executor/common/status_checker.py    |  53 +-
 .../apache/aurora/executor/status_manager.py    |  31 +-
 .../apache/aurora/client/cli/test_inspect.py    |   3 +-
 .../python/apache/aurora/client/test_config.py  |  44 +-
 .../executor/common/test_health_checker.py      | 681 ++++++++++++++++---
 .../executor/common/test_status_checker.py      | 116 +++-
 .../aurora/executor/test_status_manager.py      |  39 +-
 .../aurora/executor/test_thermos_executor.py    |  30 +-
 .../apache/aurora/e2e/http/http_example.aurora  |  14 +-
 .../http/http_example_bad_healthcheck.aurora    |  10 +-
 .../aurora/e2e/http/http_example_updated.aurora |  26 +-
 .../sh/org/apache/aurora/e2e/http_example.py    |  27 +-
 .../sh/org/apache/aurora/e2e/test_end_to_end.sh |   6 +-
 24 files changed, 1102 insertions(+), 217 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/RELEASE-NOTES.md
----------------------------------------------------------------------
diff --git a/RELEASE-NOTES.md b/RELEASE-NOTES.md
index 3924962..96926f4 100644
--- a/RELEASE-NOTES.md
+++ b/RELEASE-NOTES.md
@@ -16,6 +16,10 @@
   can be used in the Aurora job configuration to resolve a docker image specified by its `name:tag`
   to a concrete identifier specified by its `registry/name@digest`. It requires version 2 of the
   Docker Registry.
+- Use `RUNNING` state to indicate that the task is healthy and behaving as expected. Job updates
+  can now rely purely on health checks rather than `watch_secs` timeout when deciding an individial
+  instance update state, by setting `watch_secs` to 0. A service will remain in `STARTING` state
+  util `min_consecutive_successes` consecutive health checks have passed.
 
 ### Deprecations and removals:
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/development/design-documents.md
----------------------------------------------------------------------
diff --git a/docs/development/design-documents.md b/docs/development/design-documents.md
index 6bfc679..c942643 100644
--- a/docs/development/design-documents.md
+++ b/docs/development/design-documents.md
@@ -11,7 +11,7 @@ Current and past documents:
 * [Command Hooks for the Aurora Client](design/command-hooks.md)
 * [Dynamic Reservations](https://docs.google.com/document/d/19gV8Po6DIHO14tOC7Qouk8RnboY8UCfRTninwn_5-7c/edit)
 * [GPU Resources in Aurora](https://docs.google.com/document/d/1J9SIswRMpVKQpnlvJAMAJtKfPP7ZARFknuyXl-2aZ-M/edit)
-* [Health Checks for Updates](https://docs.google.com/document/d/1ZdgW8S4xMhvKW7iQUX99xZm10NXSxEWR0a-21FP5d94/edit)
+* [Health Checks for Updates](https://docs.google.com/document/d/1KOO0LC046k75TqQqJ4c0FQcVGbxvrn71E10wAjMorVY/edit)
 * [JobUpdateDiff thrift API](https://docs.google.com/document/d/1Fc_YhhV7fc4D9Xv6gJzpfooxbK4YWZcvzw6Bd3qVTL8/edit)
 * [REST API RFC](https://docs.google.com/document/d/11_lAsYIRlD5ETRzF2eSd3oa8LXAHYFD8rSetspYXaf4/edit)
 * [Revocable Mesos offers in Aurora](https://docs.google.com/document/d/1r1WCHgmPJp5wbrqSZLsgtxPNj3sULfHrSFmxp2GyPTo/edit)

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/features/job-updates.md
----------------------------------------------------------------------
diff --git a/docs/features/job-updates.md b/docs/features/job-updates.md
index 792f2ae..60968ae 100644
--- a/docs/features/job-updates.md
+++ b/docs/features/job-updates.md
@@ -34,7 +34,7 @@ You may `abort` a job update regardless of the state it is in. This will
 instruct the scheduler to completely abandon the job update and leave the job
 in the current (possibly partially-updated) state.
 
-For a configuration update, the Aurora Client calculates required changes
+For a configuration update, the Aurora Scheduler calculates required changes
 by examining the current job config state and the new desired job config.
 It then starts a *rolling batched update process* by going through every batch
 and performing these operations:
@@ -44,14 +44,13 @@ and performing these operations:
 - If an instance is not present in the scheduler but is present in
   the new config, then the instance is created.
 - If an instance is present in both the scheduler and the new config, then
-  the client diffs both task configs. If it detects any changes, it
+  the scheduler diffs both task configs. If it detects any changes, it
   performs an instance update by killing the old config instance and adds
   the new config instance.
 
-The Aurora client continues through the instance list until all tasks are
-updated, in `RUNNING,` and healthy for a configurable amount of time.
-If the client determines the update is not going well (a percentage of health
-checks have failed), it cancels the update.
+The Aurora Scheduler continues through the instance list until all tasks are
+updated and in `RUNNING`. If the scheduler determines the update is not going
+well (based on the criteria specified in the UpdateConfig), it cancels the update.
 
 Update cancellation runs a procedure similar to the described above
 update sequence, but in reverse order. New instance configs are swapped
@@ -59,7 +58,7 @@ with old instance configs and batch updates proceed backwards
 from the point where the update failed. E.g.; (0,1,2) (3,4,5) (6,7,
 8-FAIL) results in a rollback in order (8,7,6) (5,4,3) (2,1,0).
 
-For details how to control a job update, please see the
+For details on how to control a job update, please see the
 [UpdateConfig](../reference/configuration.md#updateconfig-objects) configuration object.
 
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/features/services.md
----------------------------------------------------------------------
diff --git a/docs/features/services.md b/docs/features/services.md
index b6bfc9d..50189ee 100644
--- a/docs/features/services.md
+++ b/docs/features/services.md
@@ -90,6 +90,23 @@ Please see the
 [configuration reference](../reference/configuration.md#user-content-healthcheckconfig-objects)
 for configuration options for this feature.
 
+Starting with the 0.17.0 release, job updates rely only on task health-checks by introducing
+a `min_consecutive_successes` parameter on the HealthCheckConfig object. This parameter represents
+the number of successful health checks needed before a task is moved into the `RUNNING` state. Tasks
+that do not have enough successful health checks within the first `n` attempts, are moved to the
+`FAILED` state, where `n = ceil(initial_interval_secs/interval_secs) + min_consecutive_successes`.
+In order to accommodate variability during task warm up, `initial_interval_secs` will
+act as a grace period. Any health-check failures during the first `m` attempts are ignored and
+do not count towards `max_consecutive_failures`, where `m = ceil(initial_interval_secs/interval_secs)`.
+
+As [job updates](job-updates.md) are based only on health-checks, it is not necessary to set
+`watch_secs` to the worst-case update time, it can instead be set to 0. The scheduler considers a
+task that is in the `RUNNING` to be healthy and proceeds to updating the next batch of instances.
+For details on how to control health checks, please see the
+[HealthCheckConfig](../reference/configuration.md#healthcheckconfig-objects) configuration object.
+Existing jobs that do not configure a health-check can fall-back to using `watch_secs` to
+monitor a task before considering it healthy.
+
 You can pause health checking by touching a file inside of your sandbox, named `.healthchecksnooze`.
 As long as that file is present, health checks will be disabled, enabling users to gather core
 dumps or other performance measurements without worrying about Aurora's health check killing

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/reference/configuration.md
----------------------------------------------------------------------
diff --git a/docs/reference/configuration.md b/docs/reference/configuration.md
index f2a0b18..6c71142 100644
--- a/docs/reference/configuration.md
+++ b/docs/reference/configuration.md
@@ -379,9 +379,10 @@ Parameters for controlling a task's health checks via HTTP or a shell command.
 | param                          | type      | description
 | -------                        | :-------: | --------
 | ```health_checker```           | HealthCheckerConfig | Configure what kind of health check to use.
-| ```initial_interval_secs```    | Integer   | Initial delay for performing a health check. (Default: 15)
+| ```initial_interval_secs```    | Integer   | Initial grace period (during which health-check failures are ignored) while performing health checks. (Default: 15)
 | ```interval_secs```            | Integer   | Interval on which to check the task's health. (Default: 10)
 | ```max_consecutive_failures``` | Integer   | Maximum number of consecutive failures that will be tolerated before considering a task unhealthy (Default: 0)
+| ```min_consecutive_successes``` | Integer   | Minimum number of consecutive successful health checks required before considering a task healthy (Default: 1)
 | ```timeout_secs```             | Integer   | Health check timeout. (Default: 1)
 
 ### HealthCheckerConfig Objects

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/docs/reference/task-lifecycle.md
----------------------------------------------------------------------
diff --git a/docs/reference/task-lifecycle.md b/docs/reference/task-lifecycle.md
index 4dcb481..cf1b679 100644
--- a/docs/reference/task-lifecycle.md
+++ b/docs/reference/task-lifecycle.md
@@ -35,7 +35,9 @@ the `Task` goes into `STARTING` state.
 `STARTING` state initializes a `Task` sandbox. When the sandbox is fully
 initialized, Thermos begins to invoke `Process`es. Also, the agent
 machine sends an update to the scheduler that the `Task` is
-in `RUNNING` state.
+in `RUNNING` state, only after the task satisfies the liveness requirements.
+See [Health Checking](../features/services#health-checking) for more details
+for how to configure health checks.
 
 
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/client/api/updater_util.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/api/updater_util.py b/src/main/python/apache/aurora/client/api/updater_util.py
index c649316..4e39862 100644
--- a/src/main/python/apache/aurora/client/api/updater_util.py
+++ b/src/main/python/apache/aurora/client/api/updater_util.py
@@ -35,8 +35,8 @@ class UpdaterConfig(object):
 
     if batch_size <= 0:
       raise ValueError('Batch size should be greater than 0')
-    if watch_secs <= 0:
-      raise ValueError('Watch seconds should be greater than 0')
+    if watch_secs < 0:
+      raise ValueError('Watch seconds should be greater than or equal to 0')
     if pulse_interval_secs is not None and pulse_interval_secs < self.MIN_PULSE_INTERVAL_SECONDS:
       raise ValueError('Pulse interval seconds must be at least %s seconds.'
                        % self.MIN_PULSE_INTERVAL_SECONDS)

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/client/config.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/client/config.py b/src/main/python/apache/aurora/client/config.py
index 0186af5..70c2c98 100644
--- a/src/main/python/apache/aurora/client/config.py
+++ b/src/main/python/apache/aurora/client/config.py
@@ -84,11 +84,8 @@ Based on your job size (%s) you should use max_total_failures >= %s.
 '''
 
 
-WATCH_SECS_INSUFFICIENT_ERROR_FORMAT = '''
-You have specified an insufficiently short watch period (%d seconds) in your update configuration.
-Your update will always succeed. In order for the updater to detect health check failures,
-UpdateConfig.watch_secs must be greater than %d seconds to account for an initial
-health check interval (%d seconds) plus %d consecutive failures at a check interval of %d seconds.
+INVALID_VALUE_ERROR_FORMAT = '''
+Invalid value (%s) specified for %s. Value cannot be less than 0.
 '''
 
 
@@ -101,6 +98,7 @@ def _validate_update_config(config):
   watch_secs = update_config.watch_secs().get()
   initial_interval_secs = health_check_config.initial_interval_secs().get()
   max_consecutive_failures = health_check_config.max_consecutive_failures().get()
+  min_consecutive_successes = health_check_config.min_consecutive_successes().get()
   interval_secs = health_check_config.interval_secs().get()
 
   if max_failures >= job_size:
@@ -111,10 +109,17 @@ def _validate_update_config(config):
     if max_failures < min_failure_threshold:
       die(UPDATE_CONFIG_DEDICATED_THRESHOLD_ERROR % (job_size, min_failure_threshold))
 
-  target_watch = initial_interval_secs + (max_consecutive_failures * interval_secs)
-  if watch_secs <= target_watch:
-    die(WATCH_SECS_INSUFFICIENT_ERROR_FORMAT %
-        (watch_secs, target_watch, initial_interval_secs, max_consecutive_failures, interval_secs))
+  params = [
+        (watch_secs, 'watch_secs'),
+        (max_consecutive_failures, 'max_consecutive_failures'),
+        (min_consecutive_successes, 'min_consecutive_successes'),
+        (initial_interval_secs, 'initial_interval_secs'),
+        (interval_secs, 'interval_secs')
+      ]
+
+  for value, name in params:
+    if value < 0:
+      die(INVALID_VALUE_ERROR_FORMAT % (value, name))
 
 
 PRODUCTION_DEPRECATED_WARNING = (

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/config/schema/base.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/config/schema/base.py b/src/main/python/apache/aurora/config/schema/base.py
index 8451630..b15b939 100644
--- a/src/main/python/apache/aurora/config/schema/base.py
+++ b/src/main/python/apache/aurora/config/schema/base.py
@@ -56,11 +56,12 @@ DefaultHealthChecker      = HealthCheckerConfig(http=HttpHealthChecker())
 
 
 class HealthCheckConfig(Struct):
-  health_checker           = Default(HealthCheckerConfig, DefaultHealthChecker)
-  initial_interval_secs    = Default(Float, 15.0)
-  interval_secs            = Default(Float, 10.0)
-  max_consecutive_failures = Default(Integer, 0)
-  timeout_secs             = Default(Float, 1.0)
+  health_checker            = Default(HealthCheckerConfig, DefaultHealthChecker)
+  initial_interval_secs     = Default(Float, 15.0)
+  interval_secs             = Default(Float, 10.0)
+  max_consecutive_failures  = Default(Integer, 0)
+  min_consecutive_successes = Default(Integer, 1)
+  timeout_secs              = Default(Float, 1.0)
 
 
 class HttpLifecycleConfig(Struct):

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/executor/aurora_executor.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/aurora_executor.py b/src/main/python/apache/aurora/executor/aurora_executor.py
index aee5e56..d01fcb9 100644
--- a/src/main/python/apache/aurora/executor/aurora_executor.py
+++ b/src/main/python/apache/aurora/executor/aurora_executor.py
@@ -115,8 +115,6 @@ class AuroraExecutor(ExecutorBase, Observable):
     if not self._start_runner(driver, assigned_task):
       return
 
-    self.send_update(driver, self._task_id, mesos_pb2.TASK_RUNNING)
-
     try:
       self._start_status_manager(driver, assigned_task)
     except Exception:
@@ -179,10 +177,17 @@ class AuroraExecutor(ExecutorBase, Observable):
     # chain the runner to the other checkers, but do not chain .start()/.stop()
     complete_checker = ChainedStatusChecker([self._runner, self._chained_checker])
     self._status_manager = self._status_manager_class(
-        complete_checker, self._shutdown, clock=self._clock)
+        complete_checker,
+        self._signal_running,
+        self._shutdown,
+        clock=self._clock)
     self._status_manager.start()
     self.status_manager_started.set()
 
+  def _signal_running(self, reason):
+    log.info('Send TASK_RUNNING status update. reason: %s' % reason)
+    self.send_update(self._driver, self._task_id, mesos_pb2.TASK_RUNNING, reason)
+
   def _signal_kill_manager(self, driver, task_id, reason):
     if self._task_id is None:
       log.error('Was asked to kill task but no task running!')

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/executor/common/health_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py
index 3c7c09d..12af9d8 100644
--- a/src/main/python/apache/aurora/executor/common/health_checker.py
+++ b/src/main/python/apache/aurora/executor/common/health_checker.py
@@ -12,6 +12,7 @@
 # limitations under the License.
 #
 
+import math
 import os
 import pwd
 import threading
@@ -43,14 +44,22 @@ class ThreadedHealthChecker(ExceptionalThread):
     health_checker should be a callable returning a tuple of (boolean, reason), indicating
     respectively the health of the service and the reason for its failure (or None if the service is
     still healthy).
+
+    Health-check failures are ignored during the first `math.ceil(grace_period_secs/interval_secs)`
+    attempts. Status becomes `TASK_RUNNING` if `min_consecutive_successes` consecutive health
+    check successes are seen, within `math.ceil(grace_period_secs/interval_secs) +
+    min_consecutive_successes` attempts. (Converting time to attempts, accounts for slight
+    discrepancies in sleep intervals do not cost an attempt, and unceremoniously end performing
+    health checks and marking as unhealthy.)
   """
 
   def __init__(self,
       health_checker,
       sandbox,
       interval_secs,
-      initial_interval_secs,
+      grace_period_secs,
       max_consecutive_failures,
+      min_consecutive_successes,
       clock):
     """
     :param health_checker: health checker to confirm service health
@@ -59,10 +68,12 @@ class ThreadedHealthChecker(ExceptionalThread):
     :type sandbox: DirectorySandbox
     :param interval_secs: delay between checks
     :type interval_secs: int
-    :param initial_interval_secs: seconds to wait before starting checks
-    :type initial_interval_secs: int
+    :param grace_period_secs: initial period during which failed health-checks are ignored
+    :type grace_period_secs: int
     :param max_consecutive_failures: number of failures to allow before marking dead
     :type max_consecutive_failures: int
+    :param min_consecutive_successes: number of successes needed before marking healthy
+    :type min_consecutive_successes: int
     :param clock: time module available to be mocked for testing
     :type clock: time module
     """
@@ -70,24 +81,33 @@ class ThreadedHealthChecker(ExceptionalThread):
     self.sandbox = sandbox
     self.clock = clock
     self.current_consecutive_failures = 0
+    self.current_consecutive_successes = 0
     self.dead = threading.Event()
     self.interval = interval_secs
     self.max_consecutive_failures = max_consecutive_failures
+    self.min_consecutive_successes = min_consecutive_successes
     self.snooze_file = None
     self.snoozed = False
 
     if self.sandbox and self.sandbox.exists():
       self.snooze_file = os.path.join(self.sandbox.root, '.healthchecksnooze')
 
-    if initial_interval_secs is not None:
-      self.initial_interval = initial_interval_secs
+    if grace_period_secs is not None:
+      self.grace_period_secs = grace_period_secs
     else:
-      self.initial_interval = interval_secs * 2
+      self.grace_period_secs = interval_secs * 2
+
+    self.attempts = 0
+    # Compute the number of attempts that can be fit into the grace_period_secs,
+    # to guarantee the number of health checks during the grace period.
+    # Relying on time might cause non-deterministic behavior since the
+    # health checks can be spaced apart by interval_secs + epsilon.
+    self.forgiving_attempts = math.ceil(self.grace_period_secs / self.interval)
+
+    self.max_attempts_to_running = self.forgiving_attempts + self.min_consecutive_successes
+    self.running = False
+    self.healthy, self.reason = True, None
 
-    if self.initial_interval > 0:
-      self.healthy, self.reason = True, None
-    else:
-      self.healthy, self.reason = self._perform_check_if_not_disabled()
     super(ThreadedHealthChecker, self).__init__()
     self.daemon = True
 
@@ -107,27 +127,80 @@ class ThreadedHealthChecker(ExceptionalThread):
       log.error(traceback.format_exc())
       return False, 'Internal health check error: %s' % e
 
-  def _maybe_update_failure_count(self, is_healthy, reason):
+  def _maybe_update_health_check_count(self, is_healthy, reason):
     if not is_healthy:
       log.warning('Health check failure: %s' % reason)
+
+      if self.current_consecutive_successes > 0:
+        log.debug('Reset consecutive successes counter.')
+        self.current_consecutive_successes = 0
+
+      if self._should_ignore_failure():
+        return
+
+      if self._should_fail_fast():
+        log.warning('Not enough attempts left prove health, failing fast.')
+        self.healthy = False
+        self.reason = reason
+
       self.current_consecutive_failures += 1
       if self.current_consecutive_failures > self.max_consecutive_failures:
         log.warning('Reached consecutive failure limit.')
         self.healthy = False
         self.reason = reason
     else:
+      self.current_consecutive_successes += 1
+
+      if not self.running:
+        if self.current_consecutive_successes >= self.min_consecutive_successes:
+          log.info('Reached consecutive success limit.')
+          self.running = True
+
       if self.current_consecutive_failures > 0:
         log.debug('Reset consecutive failures counter.')
-      self.current_consecutive_failures = 0
+        self.current_consecutive_failures = 0
+
+  def _should_fail_fast(self):
+    if not self.running:
+      attempts_remaining = self.max_attempts_to_running - self.attempts
+      successes_needed = self.min_consecutive_successes - self.current_consecutive_successes
+      if successes_needed > attempts_remaining:
+        return True
+    return False
+
+  def _should_ignore_failure(self):
+    if not self.running:
+      if self.attempts <= self.forgiving_attempts:
+        log.warning('Ignoring failure of attempt: %s' % self.attempts)
+        return True
+    return False
+
+  def _should_enforce_deadline(self):
+    if not self.running:
+      if self.attempts > self.max_attempts_to_running:
+        return True
+    return False
+
+  def _do_health_check(self):
+    if self._should_enforce_deadline():
+      # This is needed otherwise it is possible to flap between
+      # successful health-checks and failed health-checks, never
+      # really satisfying the criteria for either healthy or unhealthy.
+      log.warning('Exhausted attempts before satisfying liveness criteria.')
+      self.healthy = False
+      self.reason = 'Not enough successful health checks in time.'
+      return self.healthy, self.reason
+
+    is_healthy, reason = self._perform_check_if_not_disabled()
+    if not self.running:
+      self.attempts += 1
+    self._maybe_update_health_check_count(is_healthy, reason)
+    return is_healthy, reason
 
   def run(self):
     log.debug('Health checker thread started.')
-    if self.initial_interval > 0:
-      self.clock.sleep(self.initial_interval)
-    log.debug('Initial interval expired.')
     while not self.dead.is_set():
-      is_healthy, reason = self._perform_check_if_not_disabled()
-      self._maybe_update_failure_count(is_healthy, reason)
+      is_healthy, reason = self._do_health_check()
       self.clock.sleep(self.interval)
 
   def start(self):
@@ -158,8 +231,9 @@ class HealthChecker(StatusChecker):
                health_checker,
                sandbox=None,
                interval_secs=10,
-               initial_interval_secs=None,
+               grace_period_secs=None,
                max_consecutive_failures=0,
+               min_consecutive_successes=1,
                clock=time):
     self._health_checks = 0
     self._total_latency = 0
@@ -169,8 +243,9 @@ class HealthChecker(StatusChecker):
         self._timing_wrapper(health_checker),
         sandbox,
         interval_secs,
-        initial_interval_secs,
+        grace_period_secs,
         max_consecutive_failures,
+        min_consecutive_successes,
         clock)
     self.metrics.register(LambdaGauge('consecutive_failures',
         lambda: self.threaded_health_checker.current_consecutive_failures))
@@ -192,9 +267,13 @@ class HealthChecker(StatusChecker):
 
   @property
   def status(self):
-    if not self.threaded_health_checker.healthy:
-      return StatusResult('Failed health check! %s' % self.threaded_health_checker.reason,
-          TaskState.Value('TASK_FAILED'))
+    if self.threaded_health_checker.healthy:
+      if self.threaded_health_checker.running:
+        return StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING'))
+      else:
+        return StatusResult(None, TaskState.Value('TASK_STARTING'))
+    return StatusResult('Failed health check! %s' % self.threaded_health_checker.reason,
+        TaskState.Value('TASK_FAILED'))
 
   def name(self):
     return 'health_checker'
@@ -207,6 +286,22 @@ class HealthChecker(StatusChecker):
     self.threaded_health_checker.stop()
 
 
+class NoopHealthChecker(StatusChecker):
+  """
+     A health checker that will always report healthy status. This will be the
+     stand-in health checker when no health checker is configured. Since there is
+     no liveness requirement specified, the status is always `TASK_RUNNING`.
+  """
+
+  def __init__(self):
+    self._status = StatusResult('No health-check defined, task is assumed healthy.',
+        TaskState.Value('TASK_RUNNING'))
+
+  @property
+  def status(self):
+    return self._status
+
+
 class HealthCheckerProvider(StatusCheckerProvider):
 
   def __init__(self, nosetuid_health_checks=False, mesos_containerizer_path=None):
@@ -282,7 +377,8 @@ class HealthCheckerProvider(StatusCheckerProvider):
     else:
       portmap = resolve_ports(mesos_task, assigned_task.assignedPorts)
       if 'health' not in portmap:
-        return None
+        log.warning('No health-checks defined, will use a no-op health-checker.')
+        return NoopHealthChecker()
       http_config = health_checker.get(HTTP_HEALTH_CHECK, {})
       http_endpoint = http_config.get('endpoint')
       http_expected_response = http_config.get('expected_response')
@@ -301,7 +397,8 @@ class HealthCheckerProvider(StatusCheckerProvider):
       a_health_checker,
       sandbox,
       interval_secs=health_check_config.get('interval_secs'),
-      initial_interval_secs=health_check_config.get('initial_interval_secs'),
-      max_consecutive_failures=health_check_config.get('max_consecutive_failures'))
+      grace_period_secs=health_check_config.get('initial_interval_secs'),
+      max_consecutive_failures=health_check_config.get('max_consecutive_failures'),
+      min_consecutive_successes=health_check_config.get('min_consecutive_successes'))
 
     return health_checker

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/executor/common/status_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/status_checker.py b/src/main/python/apache/aurora/executor/common/status_checker.py
index 795dae2..f278825 100644
--- a/src/main/python/apache/aurora/executor/common/status_checker.py
+++ b/src/main/python/apache/aurora/executor/common/status_checker.py
@@ -49,6 +49,11 @@ class StatusResult(object):
         self._reason,
         TaskState.Name(self._status))
 
+  def __eq__(self, other):
+    if isinstance(other, StatusResult):
+      return self._status == other._status and self._reason == other._reason
+    return False
+
 
 class StatusChecker(Observable, Interface):
   """Interface to pluggable status checkers for the Aurora Executor."""
@@ -73,6 +78,13 @@ class StatusChecker(Observable, Interface):
 class StatusCheckerProvider(Interface):
   @abstractmethod
   def from_assigned_task(self, assigned_task, sandbox):
+    """
+    :param assigned_task:
+    :type assigned_task: AssignedTask
+    :param sandbox: Sandbox of the task corresponding to this status check.
+    :type sandbox: DirectorySandbox
+    :return: Instance of a HealthChecker.
+    """
     pass
 
 
@@ -92,18 +104,43 @@ class ChainedStatusChecker(StatusChecker):
 
   @property
   def status(self):
-    if self._status is None:
+    """
+      Return status that is computed from the statuses of the StatusCheckers. The computed status
+      is based on the priority given below (in increasing order of priority).
+
+      None             -> healthy (lowest-priority)
+      TASK_RUNNING     -> healthy and running
+      TASK_STARTING    -> healthy but still in starting
+      Otherwise        -> unhealthy (highest-priority)
+    """
+    if not self._in_terminal_state():
+      cur_status = None
       for status_checker in self._status_checkers:
-        status_checker_status = status_checker.status
-        if status_checker_status is not None:
-          log.info('%s reported %s' % (status_checker.__class__.__name__, status_checker_status))
-          if not isinstance(status_checker_status, StatusResult):
+        status_result = status_checker.status
+        if status_result is not None:
+          log.info('%s reported %s' % (status_checker.__class__.__name__, status_result))
+          if not isinstance(status_result, StatusResult):
             raise TypeError('StatusChecker returned something other than a StatusResult: got %s' %
-                type(status_checker_status))
-          self._status = status_checker_status
-          break
+                type(status_result))
+          if status_result.status == TaskState.Value('TASK_STARTING'):
+            # TASK_STARTING overrides other statuses
+            cur_status = status_result
+          elif status_result.status == TaskState.Value('TASK_RUNNING'):
+            if cur_status is None or cur_status == TaskState.Value('TASK_RUNNING'):
+              # TASK_RUNNING needs consensus (None is also included)
+              cur_status = status_result
+          else:
+            # Any other status leads to a terminal state
+            self._status = status_result
+            return self._status
+      self._status = cur_status
     return self._status
 
+  def _in_terminal_state(self):
+    return (self._status is not None and
+        self._status.status != TaskState.Value('TASK_RUNNING') and
+        self._status.status != TaskState.Value('TASK_STARTING'))
+
   def start(self):
     for status_checker in self._status_checkers:
       status_checker.start()

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/main/python/apache/aurora/executor/status_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/status_manager.py b/src/main/python/apache/aurora/executor/status_manager.py
index 228a99a..8b536a9 100644
--- a/src/main/python/apache/aurora/executor/status_manager.py
+++ b/src/main/python/apache/aurora/executor/status_manager.py
@@ -14,6 +14,7 @@
 
 import time
 
+from mesos.interface.mesos_pb2 import TaskState
 from twitter.common import log
 from twitter.common.exceptions import ExceptionalThread
 from twitter.common.quantity import Amount, Time
@@ -26,18 +27,24 @@ class StatusManager(ExceptionalThread):
     An agent that periodically checks the health of a task via StatusCheckers that
     provide HTTP health checking, resource consumption, etc.
 
-    If any of the status interfaces return a status, the Status Manager
-    invokes the user-supplied callback with the status.
+    Invokes the user-supplied `running_callback` with the status, if the StatusChecker
+    returns `TASK_RUNNING` as the status. `running_callback` is invoked only once during
+    the first time `TASK_RUNNING` is reported. For any other non-None statuses other than
+    `TASK_STARTING`, invokes the `unhealthy_callback` and terminates.
   """
   POLL_WAIT = Amount(500, Time.MILLISECONDS)
 
-  def __init__(self, status_checker, callback, clock=time):
+  def __init__(self, status_checker, running_callback, unhealthy_callback, clock=time):
     if not isinstance(status_checker, StatusChecker):
       raise TypeError('status_checker must be a StatusChecker, got %s' % type(status_checker))
-    if not callable(callback):
-      raise TypeError('callback needs to be callable!')
+    if not callable(running_callback):
+      raise TypeError('running_callback needs to be callable!')
+    if not callable(unhealthy_callback):
+      raise TypeError('unhealthy_callback needs to be callable!')
     self._status_checker = status_checker
-    self._callback = callback
+    self._running_callback = running_callback
+    self._running_callback_dispatched = False
+    self._unhealthy_callback = unhealthy_callback
     self._clock = clock
     super(StatusManager, self).__init__()
     self.daemon = True
@@ -47,7 +54,11 @@ class StatusManager(ExceptionalThread):
       status_result = self._status_checker.status
       if status_result is not None:
         log.info('Status manager got %s' % status_result)
-        self._callback(status_result)
-        break
-      else:
-        self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS))
+        if status_result.status == TaskState.Value('TASK_RUNNING'):
+          if not self._running_callback_dispatched:
+            self._running_callback(status_result)
+            self._running_callback_dispatched = True
+        elif status_result.status != TaskState.Value('TASK_STARTING'):
+          self._unhealthy_callback(status_result)
+          break
+      self._clock.sleep(self.POLL_WAIT.as_(Time.SECONDS))

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/test/python/apache/aurora/client/cli/test_inspect.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/cli/test_inspect.py b/src/test/python/apache/aurora/client/cli/test_inspect.py
index 7ef682d..4a23c59 100644
--- a/src/test/python/apache/aurora/client/cli/test_inspect.py
+++ b/src/test/python/apache/aurora/client/cli/test_inspect.py
@@ -99,7 +99,8 @@ Process 'process':
                     "expected_response": "ok"}},
             "interval_secs": 10.0,
             "timeout_secs": 1.0,
-            "max_consecutive_failures": 0},
+            "max_consecutive_failures": 0,
+            "min_consecutive_successes": 1},
         "cluster": "west",
         "cron_schedule": "* * * * *",
         "service": False,

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/test/python/apache/aurora/client/test_config.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/client/test_config.py b/src/test/python/apache/aurora/client/test_config.py
index 5cf68a5..042372e 100644
--- a/src/test/python/apache/aurora/client/test_config.py
+++ b/src/test/python/apache/aurora/client/test_config.py
@@ -192,21 +192,42 @@ def test_update_config_passes_with_default_values():
   config._validate_update_config(AuroraConfig(base_job))
 
 
-def test_update_config_passes_with_min_requirement_values():
+def test_update_config_passes_with_max_consecutive_failures_zero():
+  base_job = Job(
+    name='hello_world', role='john_doe', cluster='test-cluster',
+    health_check_config=HealthCheckConfig(max_consecutive_failures=0),
+    task=Task(name='main', processes=[],
+              resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB)))
+
+  config._validate_update_config(AuroraConfig(base_job))
+
+
+def test_update_config_fails_with_max_consecutive_failures_negative():
   base_job = Job(
     name='hello_world', role='john_doe', cluster='test-cluster',
     update_config=UpdateConfig(watch_secs=26),
-    health_check_config=HealthCheckConfig(max_consecutive_failures=1),
+    health_check_config=HealthCheckConfig(max_consecutive_failures=-1),
+    task=Task(name='main', processes=[],
+              resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB)))
+
+  with pytest.raises(SystemExit):
+    config._validate_update_config(AuroraConfig(base_job))
+
+
+def test_update_config_passes_with_min_consecutive_successes_zero():
+  base_job = Job(
+    name='hello_world', role='john_doe', cluster='test-cluster',
+    health_check_config=HealthCheckConfig(min_consecutive_successes=0),
     task=Task(name='main', processes=[],
               resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB)))
 
   config._validate_update_config(AuroraConfig(base_job))
 
 
-def test_update_config_fails_insufficient_watch_secs_less_than_target():
+def test_update_config_fails_with_min_consecutive_successes_negative():
   base_job = Job(
     name='hello_world', role='john_doe', cluster='test-cluster',
-    update_config=UpdateConfig(watch_secs=10),
+    health_check_config=HealthCheckConfig(min_consecutive_successes=-1),
     task=Task(name='main', processes=[],
               resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB)))
 
@@ -214,11 +235,20 @@ def test_update_config_fails_insufficient_watch_secs_less_than_target():
     config._validate_update_config(AuroraConfig(base_job))
 
 
-def test_update_config_fails_insufficient_watch_secs_equal_to_target():
+def test_update_config_passes_with_watch_secs_zero():
+  base_job = Job(
+    name='hello_world', role='john_doe', cluster='test-cluster',
+    update_config=UpdateConfig(watch_secs=0),
+    task=Task(name='main', processes=[],
+              resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB)))
+
+  config._validate_update_config(AuroraConfig(base_job))
+
+
+def test_update_config_fails_watch_secs_negative():
   base_job = Job(
     name='hello_world', role='john_doe', cluster='test-cluster',
-    update_config=UpdateConfig(watch_secs=25),
-    health_check_config=HealthCheckConfig(max_consecutive_failures=1),
+    update_config=UpdateConfig(watch_secs=-1),
     task=Task(name='main', processes=[],
               resources=Resources(cpu=0.1, ram=64 * MB, disk=64 * MB)))
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/test/python/apache/aurora/executor/common/test_health_checker.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py
index da0c56c..e2a7f16 100644
--- a/src/test/python/apache/aurora/executor/common/test_health_checker.py
+++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py
@@ -34,9 +34,10 @@ from apache.aurora.config.schema.base import (
 from apache.aurora.executor.common.health_checker import (
     HealthChecker,
     HealthCheckerProvider,
-    ThreadedHealthChecker
+    NoopHealthChecker
 )
 from apache.aurora.executor.common.sandbox import SandboxInterface
+from apache.aurora.executor.common.status_checker import StatusResult
 
 from .fixtures import HELLO_WORLD, MESOS_JOB
 
@@ -47,7 +48,8 @@ class TestHealthChecker(unittest.TestCase):
   def setUp(self):
     self._clock = ThreadedClock(0)
     self._checker = mock.Mock(spec=HttpSignaler)
-
+    self.initial_interval_secs = 15
+    self.interval_secs = 10
     self.fake_health_checks = []
     def mock_health_check():
       return self.fake_health_checks.pop(0)
@@ -58,101 +60,193 @@ class TestHealthChecker(unittest.TestCase):
     for i in range(num_calls):
       self.fake_health_checks.append((status, 'reason'))
 
-  def test_initial_interval_2x(self):
+  def test_grace_period_2x_success(self):
+    '''Grace period is 2 x interval and health checks succeed.'''
+
+    self.append_health_checks(True, num_calls=2)
+    hct = HealthChecker(
+              self._checker.health,
+              interval_secs=self.interval_secs,
+              clock=self._clock)
+    hct.start()
+    assert self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING'))
+    assert hct.threaded_health_checker.running is True
+    hct.stop()
+    assert self._checker.health.call_count == 1
+
+  def test_grace_period_2x_failure_then_success(self):
+    '''Grace period is 2 x interval and health checks fail then succeed.'''
+
     self.append_health_checks(False)
-    hct = HealthChecker(self._checker.health, interval_secs=5, clock=self._clock)
+    self.append_health_checks(True)
+    hct = HealthChecker(
+              self._checker.health,
+              interval_secs=self.interval_secs,
+              clock=self._clock)
+    hct.start()
+    assert self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
+    assert hct.threaded_health_checker.running is False
+    self._clock.tick(self.interval_secs)
+    assert self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING'))
+    assert hct.threaded_health_checker.running is True
+    hct.stop()
+    assert self._checker.health.call_count == 2
+
+  def test_grace_period_2x_failure(self):
+    '''
+      Grace period is 2 x interval and all health checks fail.
+      Failures are ignored when in grace period.
+    '''
+
+    self.append_health_checks(False, num_calls=3)
+    hct = HealthChecker(
+              self._checker.health,
+              interval_secs=self.interval_secs,
+              clock=self._clock)
     hct.start()
     assert self._clock.converge(threads=[hct.threaded_health_checker])
-    self._clock.assert_waiting(hct.threaded_health_checker, 10)
-    assert hct.status is None
-    self._clock.tick(6)
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
+    assert hct.threaded_health_checker.running is False
+    self._clock.tick(self.interval_secs)
     assert self._clock.converge(threads=[hct.threaded_health_checker])
-    assert hct.status is None
-    self._clock.tick(3)
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
+    assert hct.threaded_health_checker.running is False
+    self._clock.tick(self.interval_secs)
     assert self._clock.converge(threads=[hct.threaded_health_checker])
-    assert hct.status is None
-    self._clock.tick(5)
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult('Failed health check! reason', TaskState.Value('TASK_FAILED'))
+    assert hct.threaded_health_checker.running is False
+    hct.stop()
+    assert self._checker.health.call_count == 3
+
+  def test_success_outside_grace_period(self):
+    '''
+    Health checks fail inside grace period, but pass outside and leads to success
+    '''
+
+    self.append_health_checks(False, num_calls=2)
+    self.append_health_checks(True)
+    hct = HealthChecker(
+              self._checker.health,
+              interval_secs=self.interval_secs,
+              clock=self._clock)
+    hct.start()
     assert self._clock.converge(threads=[hct.threaded_health_checker])
-    assert hct.status.status == TaskState.Value('TASK_FAILED')
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
+    assert hct.threaded_health_checker.running is False
+    self._clock.tick(self.interval_secs)
+    assert self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
+    assert hct.threaded_health_checker.running is False
+    self._clock.tick(self.interval_secs)
+    assert self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING'))
+    assert hct.threaded_health_checker.running is True
     hct.stop()
-    assert self._checker.health.call_count == 1
+    assert self._checker.health.call_count == 3
 
   def test_initial_interval_whatev(self):
     self.append_health_checks(False, 2)
     hct = HealthChecker(
         self._checker.health,
-        interval_secs=5,
-        initial_interval_secs=0,
+        interval_secs=self.interval_secs,
+        grace_period_secs=0,
         clock=self._clock)
     hct.start()
     self._clock.converge(threads=[hct.threaded_health_checker])
-    self._clock.assert_waiting(hct.threaded_health_checker, amount=5)
-    assert hct.status.status == TaskState.Value('TASK_FAILED')
+    self._clock.assert_waiting(hct.threaded_health_checker, self.interval_secs)
+    assert hct.status == StatusResult('Failed health check! reason', TaskState.Value('TASK_FAILED'))
     hct.stop()
-    # this is an implementation detail -- we healthcheck in the initializer and
-    # healthcheck in the run loop.  if we ever change the implementation, expect
-    # this to break.
-    assert self._checker.health.call_count == 2
+    assert self._checker.health.call_count == 1
 
-  def test_consecutive_failures(self):
-    '''Verify that a task is unhealthy only after max_consecutive_failures is exceeded'''
-    initial_interval_secs = 2
-    interval_secs = 1
-    self.append_health_checks(False, num_calls=2)
-    self.append_health_checks(True)
+  def test_consecutive_failures_max_failures(self):
+    '''Verify that a task is unhealthy after max_consecutive_failures is exceeded'''
+    grace_period_secs = self.initial_interval_secs
+    interval_secs = self.interval_secs
+    self.append_health_checks(True, num_calls=2)
     self.append_health_checks(False, num_calls=3)
     hct = HealthChecker(
         self._checker.health,
         interval_secs=interval_secs,
-        initial_interval_secs=initial_interval_secs,
+        grace_period_secs=grace_period_secs,
         max_consecutive_failures=2,
+        min_consecutive_successes=2,
         clock=self._clock)
     hct.start()
-    self._clock.converge(threads=[hct.threaded_health_checker])
 
-    # 2 consecutive health check failures followed by a successful health check.
-    epsilon = 0.001
-    self._clock.tick(initial_interval_secs + epsilon)
     self._clock.converge(threads=[hct.threaded_health_checker])
-    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status is None
-    assert hct.metrics.sample()['consecutive_failures'] == 1
-    self._clock.tick(interval_secs + epsilon)
-    self._clock.converge(threads=[hct.threaded_health_checker])
-    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status is None
-    assert hct.metrics.sample()['consecutive_failures'] == 2
-    self._clock.tick(interval_secs + epsilon)
+    self._clock.assert_waiting(hct.threaded_health_checker, interval_secs)
+    assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
+    assert hct.metrics.sample()['consecutive_failures'] == 0
+    self._clock.tick(interval_secs)
     self._clock.converge(threads=[hct.threaded_health_checker])
-    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status is None
+    self._clock.assert_waiting(hct.threaded_health_checker, interval_secs)
+    assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING'))
     assert hct.metrics.sample()['consecutive_failures'] == 0
-
-    # 3 consecutive health check failures.
-    self._clock.tick(interval_secs + epsilon)
+    assert hct.threaded_health_checker.running is True
+    self._clock.tick(interval_secs)
     self._clock.converge(threads=[hct.threaded_health_checker])
-    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status is None
+    self._clock.assert_waiting(hct.threaded_health_checker, interval_secs)
+    assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING'))
     assert hct.metrics.sample()['consecutive_failures'] == 1
-    self._clock.tick(interval_secs + epsilon)
+    self._clock.tick(interval_secs)
     self._clock.converge(threads=[hct.threaded_health_checker])
-    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status is None
+    self._clock.assert_waiting(hct.threaded_health_checker, interval_secs)
+    assert hct.status == StatusResult('Task is healthy.', TaskState.Value('TASK_RUNNING'))
     assert hct.metrics.sample()['consecutive_failures'] == 2
-    self._clock.tick(interval_secs + epsilon)
+    self._clock.tick(interval_secs)
     self._clock.converge(threads=[hct.threaded_health_checker])
-    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
-    assert hct.status.status == TaskState.Value('TASK_FAILED')
+    self._clock.assert_waiting(hct.threaded_health_checker, interval_secs)
+    assert hct.status == StatusResult('Failed health check! reason', TaskState.Value('TASK_FAILED'))
     assert hct.metrics.sample()['consecutive_failures'] == 3
     hct.stop()
-    assert self._checker.health.call_count == 6
+    assert self._checker.health.call_count == 5
+
+  def test_consecutive_failures_failfast(self):
+    '''Verify that health check is failed fast'''
+    grace_period_secs = self.initial_interval_secs
+    interval_secs = self.interval_secs
+    self.append_health_checks(False, num_calls=3)
+    hct = HealthChecker(
+        self._checker.health,
+        interval_secs=interval_secs,
+        grace_period_secs=grace_period_secs,
+        max_consecutive_failures=2,
+        min_consecutive_successes=2,
+        clock=self._clock)
+    hct.start()
+
+    # 3 consecutive health check failures causes fail-fast
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, interval_secs)
+    assert hct.status == StatusResult(None, TaskState.Value('TASK_STARTING'))
+    # failure is ignored inside grace_period_secs
+    assert hct.metrics.sample()['consecutive_failures'] == 0
+    self._clock.tick(interval_secs)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, interval_secs)
+    assert hct.status == StatusResult('Failed health check! reason', TaskState.Value('TASK_FAILED'))
+    assert hct.metrics.sample()['consecutive_failures'] == 1
+    hct.stop()
+    assert self._checker.health.call_count == 2
 
   @pytest.mark.skipif('True', reason='Flaky test (AURORA-1182)')
   def test_health_checker_metrics(self):
     def slow_check():
       self._clock.sleep(0.5)
       return (True, None)
-    hct = HealthChecker(slow_check, interval_secs=1, initial_interval_secs=1, clock=self._clock)
+    hct = HealthChecker(slow_check, interval_secs=1, grace_period_secs=1, clock=self._clock)
     hct.start()
     self._clock.converge(threads=[hct.threaded_health_checker])
     self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
@@ -193,6 +287,7 @@ class TestHealthCheckerProvider(unittest.TestCase):
     interval_secs = 17
     initial_interval_secs = 3
     max_consecutive_failures = 2
+    min_consecutive_successes = 2
     task_config = TaskConfig(
         executorConfig=ExecutorConfig(
             name='thermos',
@@ -202,6 +297,7 @@ class TestHealthCheckerProvider(unittest.TestCase):
                     interval_secs=interval_secs,
                     initial_interval_secs=initial_interval_secs,
                     max_consecutive_failures=max_consecutive_failures,
+                    min_consecutive_successes=min_consecutive_successes,
                     timeout_secs=7
                 )
             ).json_dumps()
@@ -209,15 +305,17 @@ class TestHealthCheckerProvider(unittest.TestCase):
     )
     assigned_task = AssignedTask(task=task_config, instanceId=1, assignedPorts={'health': 9001})
     health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, None)
-    assert health_checker.threaded_health_checker.interval == interval_secs
-    assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs
-    hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures
-    assert hct_max_fail == max_consecutive_failures
+    hc = health_checker.threaded_health_checker
+    assert hc.interval == interval_secs
+    assert hc.grace_period_secs == initial_interval_secs
+    assert hc.max_consecutive_failures == max_consecutive_failures
+    assert hc.min_consecutive_successes == min_consecutive_successes
 
   def test_from_assigned_task_http_endpoint_style_config(self):
     interval_secs = 17
     initial_interval_secs = 3
     max_consecutive_failures = 2
+    min_consecutive_successes = 2
     http_config = HttpHealthChecker(
       endpoint='/foo',
       expected_response='bar',
@@ -233,6 +331,7 @@ class TestHealthCheckerProvider(unittest.TestCase):
                     interval_secs=interval_secs,
                     initial_interval_secs=initial_interval_secs,
                     max_consecutive_failures=max_consecutive_failures,
+                    min_consecutive_successes=min_consecutive_successes,
                     timeout_secs=7
                 )
             ).json_dumps()
@@ -245,14 +344,18 @@ class TestHealthCheckerProvider(unittest.TestCase):
     assert http_exec_config['expected_response'] == 'bar'
     assert http_exec_config['expected_response_code'] == 201
     health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, None)
-    assert health_checker.threaded_health_checker.interval == interval_secs
-    assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs
+    hc = health_checker.threaded_health_checker
+    assert hc.interval == interval_secs
+    assert hc.grace_period_secs == initial_interval_secs
+    assert hc.max_consecutive_failures == max_consecutive_failures
+    assert hc.min_consecutive_successes == min_consecutive_successes
 
   @mock.patch('pwd.getpwnam')
   def test_from_assigned_task_shell(self, mock_getpwnam):
     interval_secs = 17
     initial_interval_secs = 3
     max_consecutive_failures = 2
+    min_consecutive_successes = 2
     timeout_secs = 5
     shell_config = ShellHealthChecker(shell_command='failed command')
     task_config = TaskConfig(
@@ -266,7 +369,8 @@ class TestHealthCheckerProvider(unittest.TestCase):
                     interval_secs=interval_secs,
                     initial_interval_secs=initial_interval_secs,
                     max_consecutive_failures=max_consecutive_failures,
-                    timeout_secs=timeout_secs,
+                    min_consecutive_successes=min_consecutive_successes,
+                    timeout_secs=timeout_secs
                 )
             ).json_dumps()
         )
@@ -281,10 +385,11 @@ class TestHealthCheckerProvider(unittest.TestCase):
     type(mock_sandbox).is_filesystem_image = mock.PropertyMock(return_value=False)
 
     health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, mock_sandbox)
-    assert health_checker.threaded_health_checker.interval == interval_secs
-    assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs
-    hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures
-    assert hct_max_fail == max_consecutive_failures
+    hc = health_checker.threaded_health_checker
+    assert hc.interval == interval_secs
+    assert hc.grace_period_secs == initial_interval_secs
+    assert hc.max_consecutive_failures == max_consecutive_failures
+    assert hc.min_consecutive_successes == min_consecutive_successes
     mock_getpwnam.assert_called_once_with(task_config.job.role)
 
   @mock.patch('pwd.getpwnam')
@@ -292,6 +397,7 @@ class TestHealthCheckerProvider(unittest.TestCase):
     interval_secs = 17
     initial_interval_secs = 3
     max_consecutive_failures = 2
+    min_consecutive_successes = 2
     timeout_secs = 5
     shell_config = ShellHealthChecker(shell_command='failed command')
     task_config = TaskConfig(
@@ -305,7 +411,8 @@ class TestHealthCheckerProvider(unittest.TestCase):
                     interval_secs=interval_secs,
                     initial_interval_secs=initial_interval_secs,
                     max_consecutive_failures=max_consecutive_failures,
-                    timeout_secs=timeout_secs,
+                    min_consecutive_successes=min_consecutive_successes,
+                    timeout_secs=timeout_secs
                 )
             ).json_dumps()
         )
@@ -322,10 +429,11 @@ class TestHealthCheckerProvider(unittest.TestCase):
     health_checker = HealthCheckerProvider(nosetuid_health_checks=True).from_assigned_task(
         assigned_task,
         mock_sandbox)
-    assert health_checker.threaded_health_checker.interval == interval_secs
-    assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs
-    hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures
-    assert hct_max_fail == max_consecutive_failures
+    hc = health_checker.threaded_health_checker
+    assert hc.interval == interval_secs
+    assert hc.grace_period_secs == initial_interval_secs
+    assert hc.max_consecutive_failures == max_consecutive_failures
+    assert hc.min_consecutive_successes == min_consecutive_successes
     # Should not be trying to access role's user info.
     assert not mock_getpwnam.called
 
@@ -334,6 +442,7 @@ class TestHealthCheckerProvider(unittest.TestCase):
     interval_secs = 17
     initial_interval_secs = 3
     max_consecutive_failures = 2
+    min_consecutive_successes = 2
     timeout_secs = 5
     shell_config = ShellHealthChecker(shell_command='failed command')
     task_config = TaskConfig(
@@ -347,7 +456,8 @@ class TestHealthCheckerProvider(unittest.TestCase):
                                     interval_secs=interval_secs,
                                     initial_interval_secs=initial_interval_secs,
                                     max_consecutive_failures=max_consecutive_failures,
-                                    timeout_secs=timeout_secs,
+                                    min_consecutive_successes=min_consecutive_successes,
+                                    timeout_secs=timeout_secs
                             )
                     ).json_dumps()
             )
@@ -380,6 +490,7 @@ class TestHealthCheckerProvider(unittest.TestCase):
     interval_secs = 17
     initial_interval_secs = 3
     max_consecutive_failures = 2
+    min_consecutive_successes = 2
     timeout_secs = 5
     shell_cmd = 'FOO_PORT={{thermos.ports[foo]}} failed command'
     shell_config = ShellHealthChecker(shell_command=shell_cmd)
@@ -393,7 +504,8 @@ class TestHealthCheckerProvider(unittest.TestCase):
                     interval_secs=interval_secs,
                     initial_interval_secs=initial_interval_secs,
                     max_consecutive_failures=max_consecutive_failures,
-                    timeout_secs=timeout_secs,
+                    min_consecutive_successes=min_consecutive_successes,
+                    timeout_secs=timeout_secs
                 )
             ).json_dumps()
         )
@@ -409,6 +521,7 @@ class TestHealthCheckerProvider(unittest.TestCase):
     interval_secs = 17
     initial_interval_secs = 3
     max_consecutive_failures = 2
+    min_consecutive_successes = 2
     timeout_secs = 5
     task_config = TaskConfig(
         executorConfig=ExecutorConfig(
@@ -419,7 +532,8 @@ class TestHealthCheckerProvider(unittest.TestCase):
                     interval_secs=interval_secs,
                     initial_interval_secs=initial_interval_secs,
                     max_consecutive_failures=max_consecutive_failures,
-                    timeout_secs=timeout_secs,
+                    min_consecutive_successes=min_consecutive_successes,
+                    timeout_secs=timeout_secs
                 )
             ).json_dumps()
         )
@@ -427,7 +541,7 @@ class TestHealthCheckerProvider(unittest.TestCase):
     # No health port and we don't have a shell_command.
     assigned_task = AssignedTask(task=task_config, instanceId=1, assignedPorts={'http': 9001})
     health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, None)
-    self.assertIsNone(health_checker)
+    assert isinstance(health_checker, NoopHealthChecker)
 
 
 class TestThreadedHealthChecker(unittest.TestCase):
@@ -439,17 +553,20 @@ class TestThreadedHealthChecker(unittest.TestCase):
     self.sandbox.exists.return_value = True
     self.sandbox.root = '/root'
 
-    self.initial_interval_secs = 1
-    self.interval_secs = 5
-    self.max_consecutive_failures = 2
+    self.initial_interval_secs = 15
+    self.interval_secs = 10
+    self.max_consecutive_failures = 1
+    self.min_consecutive_successes = 2
     self.clock = mock.Mock(spec=time)
-    self.clock.time.return_value = 1.0
+    self.clock.time.return_value = 0
+
     self.health_checker = HealthChecker(
         self.health,
         None,
         self.interval_secs,
         self.initial_interval_secs,
         self.max_consecutive_failures,
+        self.min_consecutive_successes,
         self.clock)
     self.health_checker_sandbox_exists = HealthChecker(
         self.health,
@@ -457,10 +574,11 @@ class TestThreadedHealthChecker(unittest.TestCase):
         self.interval_secs,
         self.initial_interval_secs,
         self.max_consecutive_failures,
+        self.min_consecutive_successes,
         self.clock)
 
   def test_perform_check_if_not_disabled_snooze_file_is_none(self):
-    self.health_checker.threaded_health_checker.snooze_file = None
+    self.health_checker_sandbox_exists.threaded_health_checker.snooze_file = None
     assert self.health.call_count == 0
     assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 0
     self.health_checker.threaded_health_checker._perform_check_if_not_disabled()
@@ -487,38 +605,411 @@ class TestThreadedHealthChecker(unittest.TestCase):
     assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 1
     assert result == (True, None)
 
-  def test_maybe_update_failure_count(self):
+  def test_maybe_update_health_check_count_reset_count(self):
+    hc = self.health_checker.threaded_health_checker
+    hc.running = True
+
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(True, 'reason-1')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 1
+
+    hc._maybe_update_health_check_count(False, 'reason-2')
+    assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(True, 'reason-3')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 1
+
+  def test_maybe_update_health_check_count_ignore_failures_before_callback(self):
     hc = self.health_checker.threaded_health_checker
+    hc.running = False
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(False, 'reason-1')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(False, 'reason-2')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+  def test_maybe_update_health_check_count_dont_ignore_failures_after_callback(self):
+    hc = self.health_checker.threaded_health_checker
+    hc.running = True
 
     assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(False, 'reason-1')
+    assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(False, 'reason-2')
+    assert hc.current_consecutive_failures == 2
+    assert hc.current_consecutive_successes == 0
+
+  def test_maybe_update_health_check_count_fail_fast(self):
+    hc = self.health_checker.threaded_health_checker
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
     assert hc.healthy is True
+    assert hc.running is False
 
-    hc._maybe_update_failure_count(True, 'reason')
+    hc.attempts += 1
+    hc._maybe_update_health_check_count(False, 'reason-1')
     assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+    assert hc.running is False
 
-    hc._maybe_update_failure_count(False, 'reason')
+    hc.attempts += 1
+    hc._maybe_update_health_check_count(False, 'reason-2')
     assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
+    assert hc.running is False
+    assert hc.healthy is False
+    assert hc.reason == 'reason-2'
+
+  def test_maybe_update_health_check_count_max_failures(self):
+    hc = self.health_checker.threaded_health_checker
+    hc.running = True
+
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
     assert hc.healthy is True
 
-    hc._maybe_update_failure_count(False, 'reason')
-    assert hc.current_consecutive_failures == 2
+    hc._maybe_update_health_check_count(False, 'reason-1')
+    assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
     assert hc.healthy is True
 
-    hc._maybe_update_failure_count(False, 'reason')
+    hc._maybe_update_health_check_count(False, 'reason-2')
+    assert hc.current_consecutive_failures == 2
+    assert hc.current_consecutive_successes == 0
     assert hc.healthy is False
-    assert hc.reason == 'reason'
+    assert hc.reason == 'reason-2'
 
-  @mock.patch('apache.aurora.executor.common.health_checker.ThreadedHealthChecker'
-      '._maybe_update_failure_count',
-      spec=ThreadedHealthChecker._maybe_update_failure_count)
-  def test_run(self, mock_maybe_update_failure_count):
+  def test_maybe_update_health_check_count_success(self):
+    hc = self.health_checker.threaded_health_checker
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+    assert hc.running is False
+    assert hc.healthy is True
+
+    hc._maybe_update_health_check_count(True, 'reason')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 1
+    assert hc.running is False
+    assert hc.healthy is True
+
+    hc._maybe_update_health_check_count(True, 'reason')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 2
+    assert hc.running is True
+    assert hc.healthy is True
+
+    hc._maybe_update_health_check_count(True, 'reason')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 3
+    assert hc.running is True
+    assert hc.healthy is True
+
+  def test_run_success(self):
+    self.health.return_value = (True, 'success')
+    mock_is_set = mock.Mock(spec=threading._Event.is_set)
+    liveness = [False, False, False, True]
+    mock_is_set.side_effect = lambda: liveness.pop(0)
+    self.health_checker.threaded_health_checker.dead.is_set = mock_is_set
+    self.health_checker.threaded_health_checker.run()
+    assert self.clock.sleep.call_count == 3
+    assert self.health_checker.threaded_health_checker.current_consecutive_failures == 0
+    assert self.health_checker.threaded_health_checker.current_consecutive_successes == 3
+    assert self.health_checker.threaded_health_checker.running is True
+    assert self.health_checker.threaded_health_checker.healthy is True
+    assert self.health_checker.threaded_health_checker.reason is None
+
+  def test_run_failure(self):
+    self.health.return_value = (False, 'failure')
     mock_is_set = mock.Mock(spec=threading._Event.is_set)
+    liveness = [False, False, False, True]
+    mock_is_set.side_effect = lambda: liveness.pop(0)
     self.health_checker.threaded_health_checker.dead.is_set = mock_is_set
-    liveness = [False, False, True]
-    self.health_checker.threaded_health_checker.dead.is_set.side_effect = lambda: liveness.pop(0)
     self.health_checker.threaded_health_checker.run()
     assert self.clock.sleep.call_count == 3
-    assert mock_maybe_update_failure_count.call_count == 2
+    assert self.health_checker.threaded_health_checker.current_consecutive_failures == 2
+    assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0
+    assert self.health_checker.threaded_health_checker.running is False
+    assert self.health_checker.threaded_health_checker.healthy is False
+    assert self.health_checker.threaded_health_checker.reason == 'failure'
+
+  def test_run_failure_unhealthy_when_failfast(self):
+    health_status = [(False, 'failure-1'), (True, None), (False, 'failure-3')]
+    self.health.side_effect = lambda: health_status.pop(0)
+    mock_is_set = mock.Mock(spec=threading._Event.is_set)
+    liveness = [False, False, False, True]
+    mock_is_set.side_effect = lambda: liveness.pop(0)
+    self.health_checker.threaded_health_checker.dead.is_set = mock_is_set
+    self.health_checker.threaded_health_checker.run()
+    assert self.clock.sleep.call_count == 3
+    assert self.health_checker.threaded_health_checker.current_consecutive_failures == 1
+    assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0
+    assert self.health_checker.threaded_health_checker.running is False
+    assert self.health_checker.threaded_health_checker.healthy is False
+    assert self.health_checker.threaded_health_checker.reason == 'failure-3'
+
+  def test_run_unhealthy_after_callback(self):
+    health_status = [(True, None), (True, None), (False, 'failure-4'), (False, 'failure-5')]
+    self.health.side_effect = lambda: health_status.pop(0)
+    mock_is_set = mock.Mock(spec=threading._Event.is_set)
+    liveness = [False, False, False, False, True]
+    mock_is_set.side_effect = lambda: liveness.pop(0)
+    self.health_checker.threaded_health_checker.dead.is_set = mock_is_set
+    self.health_checker.threaded_health_checker.run()
+    assert self.clock.sleep.call_count == 4
+    assert self.health_checker.threaded_health_checker.current_consecutive_failures == 2
+    assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0
+    assert self.health_checker.threaded_health_checker.running is True
+    assert self.health_checker.threaded_health_checker.healthy is False
+    assert self.health_checker.threaded_health_checker.reason == 'failure-5'
+
+  @mock.patch('apache.aurora.executor.common.health_checker.ExceptionalThread.start',
+      spec=ExceptionalThread.start)
+  def test_start(self, mock_start):
+    assert mock_start.call_count == 0
+    self.health_checker.threaded_health_checker.start()
+    mock_start.assert_called_once_with(self.health_checker.threaded_health_checker)
+
+  def test_stop(self):
+    assert not self.health_checker.threaded_health_checker.dead.is_set()
+    self.health_checker.threaded_health_checker.stop()
+    assert self.health_checker.threaded_health_checker.dead.is_set()
+
+
+class TestThreadedHealthCheckerWithDefaults(unittest.TestCase):
+  '''
+    Similar tests as above but with the default health check configuration. This
+    will ensure that the defaults are always valid.
+  '''
+
+  def setUp(self):
+    self.health = mock.Mock()
+    self.health.return_value = (True, 'Fake')
+
+    self.sandbox = mock.Mock(spec_set=SandboxInterface)
+    self.sandbox.exists.return_value = True
+    self.sandbox.root = '/root'
+
+    self.health_checker = HealthCheckerProvider().from_assigned_task(
+        AssignedTask(
+            task=TaskConfig(
+                executorConfig=ExecutorConfig(
+                    name='thermos',
+                    data=MESOS_JOB(task=HELLO_WORLD).json_dumps())),
+            instanceId=1,
+            assignedPorts={'health': 9001}),
+        self.sandbox)
+
+    self.health_checker.threaded_health_checker.checker = self.health
+
+  def test_perform_check_if_not_disabled_snooze_file_is_none(self):
+    self.health_checker.threaded_health_checker.snooze_file = None
+    assert self.health.call_count == 0
+    assert self.health_checker.metrics.sample()['snoozed'] == 0
+    self.health_checker.threaded_health_checker._perform_check_if_not_disabled()
+    assert self.health.call_count == 1
+    assert self.health_checker.metrics.sample()['snoozed'] == 0
+
+  @mock.patch('os.path', spec_set=os.path)
+  def test_perform_check_if_not_disabled_no_snooze_file(self, mock_os_path):
+    mock_os_path.isfile.return_value = False
+    assert self.health.call_count == 0
+    assert self.health_checker.metrics.sample()['snoozed'] == 0
+    self.health_checker.threaded_health_checker._perform_check_if_not_disabled()
+    assert self.health.call_count == 1
+    assert self.health_checker.metrics.sample()['snoozed'] == 0
+
+  @mock.patch('os.path', spec_set=os.path)
+  def test_perform_check_if_not_disabled_snooze_file_exists(self, mock_os_path):
+    mock_os_path.isfile.return_value = True
+    assert self.health.call_count == 0
+    assert self.health_checker.metrics.sample()['snoozed'] == 0
+    result = (
+        self.health_checker.threaded_health_checker._perform_check_if_not_disabled())
+    assert self.health.call_count == 0
+    assert self.health_checker.metrics.sample()['snoozed'] == 1
+    assert result == (True, None)
+
+  def test_maybe_update_health_check_count_reset_count(self):
+    hc = self.health_checker.threaded_health_checker
+    hc.running = True
+
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(True, 'reason-1')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 1
+
+    hc._maybe_update_health_check_count(False, 'reason-2')
+    assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(True, 'reason-3')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 1
+
+  def test_maybe_update_health_check_count_ignore_failures_before_callback(self):
+    hc = self.health_checker.threaded_health_checker
+    hc.running = False
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(False, 'reason-1')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(False, 'reason-2')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+  def test_maybe_update_health_check_count_dont_ignore_failures_after_callback(self):
+    hc = self.health_checker.threaded_health_checker
+    hc.running = True
+
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(False, 'reason-1')
+    assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
+
+    hc._maybe_update_health_check_count(False, 'reason-2')
+    assert hc.current_consecutive_failures == 2
+    assert hc.current_consecutive_successes == 0
+
+  def test_maybe_update_health_check_count_fail_fast(self):
+    hc = self.health_checker.threaded_health_checker
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+    assert hc.healthy is True
+    assert hc.running is False
+
+    hc.attempts += 1
+    hc._maybe_update_health_check_count(False, 'reason-1')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+    assert hc.running is False
+
+    hc.attempts += 1
+    hc._maybe_update_health_check_count(False, 'reason-2')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+    assert hc.running is False
+
+    hc.attempts += 1
+    hc._maybe_update_health_check_count(False, 'reason-3')
+    assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
+    assert hc.running is False
+    assert hc.healthy is False
+    assert hc.reason == 'reason-3'
+
+  def test_maybe_update_health_check_count_max_failures(self):
+    hc = self.health_checker.threaded_health_checker
+    hc.running = True
+
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+    assert hc.healthy is True
+
+    hc._maybe_update_health_check_count(False, 'reason-1')
+    assert hc.current_consecutive_failures == 1
+    assert hc.current_consecutive_successes == 0
+    assert hc.healthy is False
+    assert hc.reason == 'reason-1'
+
+  def test_maybe_update_health_check_count_success(self):
+    hc = self.health_checker.threaded_health_checker
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 0
+    assert hc.healthy is True
+    assert hc.running is False
+
+    hc._maybe_update_health_check_count(True, 'reason')
+    assert hc.current_consecutive_failures == 0
+    assert hc.current_consecutive_successes == 1
+    assert hc.running is True
+    assert hc.healthy is True
+
+  @mock.patch('apache.aurora.executor.common.health_checker.time.sleep', spec=time.sleep)
+  def test_run_success(self, mock_sleep):
+    mock_sleep.return_value = None
+    self.health.return_value = (True, 'success')
+    mock_is_set = mock.Mock(spec=threading._Event.is_set)
+    liveness = [False, False, False, True]
+    mock_is_set.side_effect = lambda: liveness.pop(0)
+    self.health_checker.threaded_health_checker.dead.is_set = mock_is_set
+    self.health_checker.threaded_health_checker.run()
+    assert mock_sleep.call_count == 3
+    assert self.health_checker.threaded_health_checker.current_consecutive_failures == 0
+    assert self.health_checker.threaded_health_checker.current_consecutive_successes == 3
+    assert self.health_checker.threaded_health_checker.running is True
+    assert self.health_checker.threaded_health_checker.healthy is True
+    assert self.health_checker.threaded_health_checker.reason is None
+
+  @mock.patch('apache.aurora.executor.common.health_checker.time.sleep', spec=time.sleep)
+  def test_run_failure(self, mock_sleep):
+    mock_sleep.return_value = None
+    self.health.return_value = (False, 'failure')
+    mock_is_set = mock.Mock(spec=threading._Event.is_set)
+    liveness = [False, False, False, True]
+    mock_is_set.side_effect = lambda: liveness.pop(0)
+    self.health_checker.threaded_health_checker.dead.is_set = mock_is_set
+    self.health_checker.threaded_health_checker.run()
+    assert mock_sleep.call_count == 3
+    assert self.health_checker.threaded_health_checker.current_consecutive_failures == 1
+    assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0
+    assert self.health_checker.threaded_health_checker.running is False
+    assert self.health_checker.threaded_health_checker.healthy is False
+    assert self.health_checker.threaded_health_checker.reason == 'failure'
+
+  @mock.patch('apache.aurora.executor.common.health_checker.time.sleep', spec=time.sleep)
+  def test_run_failure_unhealthy_when_failfast(self, mock_sleep):
+    mock_sleep.return_value = None
+    health_status = [(False, 'failure-1'), (False, 'failure-2'), (False, 'failure-3')]
+    self.health.side_effect = lambda: health_status.pop(0)
+    mock_is_set = mock.Mock(spec=threading._Event.is_set)
+    liveness = [False, False, False, True]
+    mock_is_set.side_effect = lambda: liveness.pop(0)
+    self.health_checker.threaded_health_checker.dead.is_set = mock_is_set
+    self.health_checker.threaded_health_checker.run()
+    assert mock_sleep.call_count == 3
+    assert self.health_checker.threaded_health_checker.current_consecutive_failures == 1
+    assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0
+    assert self.health_checker.threaded_health_checker.running is False
+    assert self.health_checker.threaded_health_checker.healthy is False
+    assert self.health_checker.threaded_health_checker.reason == 'failure-3'
+
+  @mock.patch('apache.aurora.executor.common.health_checker.time.sleep', spec=time.sleep)
+  def test_run_unhealthy_after_callback(self, mock_sleep):
+    mock_sleep.return_value = None
+    health_status = [(True, None), (True, None), (False, 'failure-4'), (False, 'failure-5')]
+    self.health.side_effect = lambda: health_status.pop(0)
+    mock_is_set = mock.Mock(spec=threading._Event.is_set)
+    liveness = [False, False, False, False, True]
+    mock_is_set.side_effect = lambda: liveness.pop(0)
+    self.health_checker.threaded_health_checker.dead.is_set = mock_is_set
+    self.health_checker.threaded_health_checker.run()
+    assert mock_sleep.call_count == 4
+    assert self.health_checker.threaded_health_checker.current_consecutive_failures == 2
+    assert self.health_checker.threaded_health_checker.current_consecutive_successes == 0
+    assert self.health_checker.threaded_health_checker.running is True
+    assert self.health_checker.threaded_health_checker.healthy is False
+    assert self.health_checker.threaded_health_checker.reason == 'failure-5'
 
   @mock.patch('apache.aurora.executor.common.health_checker.ExceptionalThread.start',
       spec=ExceptionalThread.start)

http://git-wip-us.apache.org/repos/asf/aurora/blob/2992c8b4/src/test/python/apache/aurora/executor/common/test_status_checker.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_status_checker.py b/src/test/python/apache/aurora/executor/common/test_status_checker.py
index 5be1981..3d5fe12 100644
--- a/src/test/python/apache/aurora/executor/common/test_status_checker.py
+++ b/src/test/python/apache/aurora/executor/common/test_status_checker.py
@@ -14,6 +14,7 @@
 
 import threading
 
+import pytest
 from mesos.interface.mesos_pb2 import TaskState
 
 from apache.aurora.executor.common.status_checker import (
@@ -23,12 +24,16 @@ from apache.aurora.executor.common.status_checker import (
     StatusResult
 )
 
+TASK_STARTING = StatusResult(None, TaskState.Value('TASK_STARTING'))
+TASK_RUNNING = StatusResult(None, TaskState.Value('TASK_RUNNING'))
+TASK_FAILED = StatusResult(None, TaskState.Value('TASK_FAILED'))
+
 
 class EventHealth(StatusChecker):
-  def __init__(self):
+  def __init__(self, status=None):
     self.started = threading.Event()
     self.stopped = threading.Event()
-    self._status = None
+    self._status = status
 
   @property
   def status(self):
@@ -73,3 +78,110 @@ def test_chained_health_interface():
   chained_si.stop()
   for si in (si1, si2):
     assert si.stopped.is_set()
+
+
+def test_chained_empty_checkers():
+  hi = ChainedStatusChecker([])
+  assert hi.status is None
+
+
+def test_chained_healthy_status_none():
+  hi = ChainedStatusChecker([EventHealth()])
+  assert hi.status is None
+
+  hi = ChainedStatusChecker([EventHealth(), EventHealth(), EventHealth()])
+  assert hi.status is None
+
+
+def test_chained_healthy_status_starting():
+  hi = ChainedStatusChecker([EventHealth(TASK_STARTING)])
+  assert hi.status is TASK_STARTING
+
+  hi = ChainedStatusChecker([EventHealth(TASK_STARTING),
+      EventHealth(TASK_STARTING),
+      EventHealth(TASK_STARTING)])
+  assert hi.status is TASK_STARTING
+
+
+def test_chained_healthy_status_running():
+  hi = ChainedStatusChecker([EventHealth(TASK_RUNNING)])
+  assert hi.status is TASK_RUNNING
+
+  hi = ChainedStatusChecker([EventHealth(TASK_RUNNING),
+      EventHealth(TASK_RUNNING),
+      EventHealth(TASK_RUNNING)])
+  assert hi.status is TASK_RUNNING
+
+
+def test_chained_healthy_status_failed():
+  hi = ChainedStatusChecker([EventHealth(TASK_FAILED)])
+  assert hi.status is TASK_FAILED
+
+  hi = ChainedStatusChecker([EventHealth(TASK_FAILED),
+      EventHealth(TASK_FAILED),
+      EventHealth(TASK_FAILED)])
+  assert hi.status is TASK_FAILED
+
+
+def test_chained_status_failed_trumps_all():
+  hi = ChainedStatusChecker([EventHealth(),
+      EventHealth(TASK_RUNNING),
+      EventHealth(TASK_STARTING),
+      EventHealth(TASK_FAILED)])
+  assert hi.status is TASK_FAILED
+
+  hi = ChainedStatusChecker([EventHealth(TASK_FAILED),
+      EventHealth(TASK_STARTING),
+      EventHealth(TASK_RUNNING),
+      EventHealth()])
+  assert hi.status is TASK_FAILED
+
+
+def test_chained_status_starting_trumps_running_and_none():
+  hi = ChainedStatusChecker([EventHealth(), EventHealth(TASK_RUNNING), EventHealth(TASK_STARTING)])
+  assert hi.status is TASK_STARTING
+
+  hi = ChainedStatusChecker([EventHealth(TASK_STARTING), EventHealth(TASK_RUNNING), EventHealth()])
+  assert hi.status is TASK_STARTING
+
+
+def test_chained_status_running_trumps_none():
+  hi = ChainedStatusChecker([EventHealth(TASK_RUNNING), EventHealth()])
+  assert hi.status is TASK_RUNNING
+
+  hi = ChainedStatusChecker([EventHealth(), EventHealth(TASK_RUNNING)])
+  assert hi.status is TASK_RUNNING
+
+
+def test_chained_status_starting_to_running_consensus():
+  eh1 = EventHealth(TASK_STARTING)
+  eh2 = EventHealth(TASK_STARTING)
+  hi = ChainedStatusChecker([eh1, eh2])
+  assert hi.status is TASK_STARTING
+
+  eh1.set_status(TASK_RUNNING)
+  assert hi.status is TASK_STARTING
+
+  eh2.set_status(TASK_RUNNING)
+  assert hi.status is TASK_RUNNING
+
+
+def test_chained_status_failed_is_terminal():
+  eh = EventHealth(TASK_FAILED)
+  hi = ChainedStatusChecker([eh])
+  assert hi.status is TASK_FAILED
+
+  eh.set_status(TASK_RUNNING)
+  assert hi.status is TASK_FAILED
+
+  eh.set_status(TASK_STARTING)
+  assert hi.status is TASK_FAILED
+
+  eh.set_status(None)
+  assert hi.status is TASK_FAILED
+
+
+def test_chained_status_raises_unknown_status_result():
+  hi = ChainedStatusChecker([EventHealth(1)])
+  with pytest.raises(TypeError):
+    hi.status


Mime
View raw message