aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject incubator-aurora git commit: Instrument the HealthChecker to export stats.
Date Sat, 21 Feb 2015 00:01:22 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 439a168f9 -> e5de618cc


Instrument the HealthChecker to export stats.

Testing Done:
./pants test.pytest --no-fast src/test/python/apache/aurora/executor/common::

Bugs closed: AURORA-1062

Reviewed at https://reviews.apache.org/r/30647/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/e5de618c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/e5de618c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/e5de618c

Branch: refs/heads/master
Commit: e5de618ccbe778f2f77956c7d8c7904e4452cdf7
Parents: 439a168
Author: Brian Wickman <wickman@apache.org>
Authored: Fri Feb 20 16:01:17 2015 -0800
Committer: Brian Wickman <wickman@apache.org>
Committed: Fri Feb 20 16:01:17 2015 -0800

----------------------------------------------------------------------
 .../aurora/executor/common/health_checker.py    |  38 ++++++-
 .../aurora/executor/common/status_checker.py    |   4 +-
 .../executor/common/test_health_checker.py      | 106 +++++++++++++------
 .../aurora/executor/test_thermos_executor.py    |   4 -
 4 files changed, 112 insertions(+), 40 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e5de618c/src/main/python/apache/aurora/executor/common/health_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py
index 60676ba..0d3365d 100644
--- a/src/main/python/apache/aurora/executor/common/health_checker.py
+++ b/src/main/python/apache/aurora/executor/common/health_checker.py
@@ -19,6 +19,7 @@ import time
 from mesos.interface.mesos_pb2 import TaskState
 from twitter.common import log
 from twitter.common.exceptions import ExceptionalThread
+from twitter.common.metrics import LambdaGauge
 
 from apache.aurora.common.http_signaler import HttpSignaler
 
@@ -63,6 +64,7 @@ class ThreadedHealthChecker(ExceptionalThread):
     self.interval = interval_secs
     self.max_consecutive_failures = max_consecutive_failures
     self.snooze_file = None
+    self.snoozed = False
 
     if self.sandbox and self.sandbox.exists():
       self.snooze_file = os.path.join(self.sandbox.root, '.healthchecksnooze')
@@ -81,9 +83,11 @@ class ThreadedHealthChecker(ExceptionalThread):
 
   def _perform_check_if_not_disabled(self):
     if self.snooze_file and os.path.isfile(self.snooze_file):
+      self.snoozed = True
       log.info("Health check snooze file found at %s. Health checks disabled.", self.snooze_file)
       return True, None
 
+    self.snoozed = False
     log.debug("Health checks enabled. Performing health check.")
     return self.checker()
 
@@ -123,6 +127,14 @@ class HealthChecker(StatusChecker):
     health_checker should be a callable returning a tuple of (boolean, reason), indicating
     respectively the health of the service and the reason for its failure (or None if the
service is
     still healthy).
+
+    Exported metrics:
+      health_checker.consecutive_failures: Number of consecutive failures observed.  Resets
+        to zero on successful health check.
+      health_checker.snoozed: Returns 1 if the health checker is snoozed, 0 if not.
+      health_checker.total_latency_secs: Total time waiting for the health checker to respond
in
+        seconds. To get average latency, use health_checker.total_latency / health_checker.checks.
+      health_checker.checks: Total number of health checks performed.
   """
 
   def __init__(self,
@@ -132,13 +144,34 @@ class HealthChecker(StatusChecker):
                initial_interval_secs=None,
                max_consecutive_failures=0,
                clock=time):
+    self._health_checks = 0
+    self._total_latency = 0
+    self._stats_lock = threading.Lock()
+    self._clock = clock
     self.threaded_health_checker = ThreadedHealthChecker(
-        health_checker,
+        self._timing_wrapper(health_checker),
         sandbox,
         interval_secs,
         initial_interval_secs,
         max_consecutive_failures,
         clock)
+    self.metrics.register(LambdaGauge('consecutive_failures',
+        lambda: self.threaded_health_checker.current_consecutive_failures))
+    self.metrics.register(LambdaGauge('snoozed', lambda: int(self.threaded_health_checker.snoozed)))
+    self.metrics.register(LambdaGauge('total_latency_secs', lambda: self._total_latency))
+    self.metrics.register(LambdaGauge('checks', lambda: self._health_checks))
+
+  def _timing_wrapper(self, closure):
+    """A wrapper around the health check closure that times the health check duration."""
+    def wrapper(*args, **kw):
+      start = self._clock.time()
+      success, failure_reason = closure(*args, **kw)
+      stop = self._clock.time()
+      with self._stats_lock:
+        self._health_checks += 1
+        self._total_latency += stop - start
+      return (success, failure_reason)
+    return wrapper
 
   @property
   def status(self):
@@ -146,6 +179,9 @@ class HealthChecker(StatusChecker):
       return StatusResult('Failed health check! %s' % self.threaded_health_checker.reason,
           TaskState.Value('TASK_FAILED'))
 
+  def name(self):
+    return 'health_checker'
+
   def start(self):
     super(HealthChecker, self).start()
     self.threaded_health_checker.start()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e5de618c/src/main/python/apache/aurora/executor/common/status_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/status_checker.py b/src/main/python/apache/aurora/executor/common/status_checker.py
index 624921d..795dae2 100644
--- a/src/main/python/apache/aurora/executor/common/status_checker.py
+++ b/src/main/python/apache/aurora/executor/common/status_checker.py
@@ -17,7 +17,7 @@ from abc import abstractmethod, abstractproperty
 from mesos.interface.mesos_pb2 import TaskState
 from twitter.common import log
 from twitter.common.lang import Interface
-from twitter.common.metrics import NamedGauge, Observable
+from twitter.common.metrics import Observable
 
 
 class StatusResult(object):
@@ -63,7 +63,7 @@ class StatusChecker(Observable, Interface):
 
   def start(self):
     """Invoked once the task has been started."""
-    self.metrics.register(NamedGauge('enabled', 1))
+    pass
 
   def stop(self):
     """Invoked once a non-None status has been reported."""

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e5de618c/src/test/python/apache/aurora/executor/common/test_health_checker.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py
index a4e215d..4e09d30 100644
--- a/src/test/python/apache/aurora/executor/common/test_health_checker.py
+++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py
@@ -105,26 +105,62 @@ class TestHealthChecker(unittest.TestCase):
     self._clock.tick(initial_interval_secs)
     assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
     assert hct.status is None
+    assert hct.metrics.sample()['consecutive_failures'] == 1
     self._clock.tick(interval_secs)
     assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
     assert hct.status is None
+    assert hct.metrics.sample()['consecutive_failures'] == 2
     self._clock.tick(interval_secs)
     assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
     assert hct.status is None
+    assert hct.metrics.sample()['consecutive_failures'] == 0
 
     # 3 consecutive health check failures.
     self._clock.tick(interval_secs)
     assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
     assert hct.status is None
+    assert hct.metrics.sample()['consecutive_failures'] == 1
     self._clock.tick(interval_secs)
     assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
     assert hct.status is None
+    assert hct.metrics.sample()['consecutive_failures'] == 2
     self._clock.tick(interval_secs)
     assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
     assert hct.status.status == TaskState.Value('TASK_FAILED')
+    assert hct.metrics.sample()['consecutive_failures'] == 3
     hct.stop()
     assert self._checker.health.call_count == 6
 
+  def test_health_checker_metrics(self):
+    def slow_check():
+      self._clock.sleep(0.5)
+      return (True, None)
+    hct = HealthChecker(slow_check, interval_secs=1, initial_interval_secs=1, clock=self._clock)
+    hct.start()
+    assert hct._total_latency == 0
+    assert hct.metrics.sample()['total_latency_secs'] == 0
+
+    # start the health check (during health check it is still 0)
+    self._clock.tick(1.0)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    assert hct._total_latency == 0
+    assert hct.metrics.sample()['total_latency_secs'] == 0
+    assert hct.metrics.sample()['checks'] == 0
+
+    # finish the health check
+    self._clock.tick(0.5)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    assert hct._total_latency == 0.5
+    assert hct.metrics.sample()['total_latency_secs'] == 0.5
+    assert hct.metrics.sample()['checks'] == 1
+
+    # tick again
+    self._clock.tick(1.5)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    assert hct._total_latency == 1.0
+    assert hct.metrics.sample()['total_latency_secs'] == 1.0
+    assert hct.metrics.sample()['checks'] == 2
+
 
 class TestHealthCheckerProvider(unittest.TestCase):
   def test_from_assigned_task(self):
@@ -166,15 +202,15 @@ class TestThreadedHealthChecker(unittest.TestCase):
     self.interval_secs = 5
     self.max_consecutive_failures = 2
     self.clock = mock.Mock(spec=time)
-    self.threaded_health_checker = ThreadedHealthChecker(
+    self.clock.time.return_value = 1.0
+    self.health_checker = HealthChecker(
         self.signaler.health,
         None,
         self.interval_secs,
         self.initial_interval_secs,
         self.max_consecutive_failures,
         self.clock)
-
-    self.threaded_health_checker_sandbox_exists = ThreadedHealthChecker(
+    self.health_checker_sandbox_exists = HealthChecker(
         self.signaler.health,
         self.sandbox,
         self.interval_secs,
@@ -183,59 +219,63 @@ class TestThreadedHealthChecker(unittest.TestCase):
         self.clock)
 
   def test_perform_check_if_not_disabled_snooze_file_is_none(self):
-    self.threaded_health_checker.snooze_file = None
-
+    self.health_checker.threaded_health_checker.snooze_file = None
     assert self.signaler.health.call_count == 0
-    self.threaded_health_checker._perform_check_if_not_disabled()
+    assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 0
+    self.health_checker.threaded_health_checker._perform_check_if_not_disabled()
     assert self.signaler.health.call_count == 1
+    assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 0
 
   @mock.patch('os.path', spec_set=os.path)
   def test_perform_check_if_not_disabled_no_snooze_file(self, mock_os_path):
     mock_os_path.isfile.return_value = False
-
     assert self.signaler.health.call_count == 0
-    self.threaded_health_checker_sandbox_exists._perform_check_if_not_disabled()
+    assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 0
+    self.health_checker_sandbox_exists.threaded_health_checker._perform_check_if_not_disabled()
     assert self.signaler.health.call_count == 1
+    assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 0
 
   @mock.patch('os.path', spec_set=os.path)
   def test_perform_check_if_not_disabled_snooze_file_exists(self, mock_os_path):
     mock_os_path.isfile.return_value = True
-
     assert self.signaler.health.call_count == 0
-    result = self.threaded_health_checker_sandbox_exists._perform_check_if_not_disabled()
+    assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 0
+    result = (
+        self.health_checker_sandbox_exists.threaded_health_checker._perform_check_if_not_disabled())
     assert self.signaler.health.call_count == 0
+    assert self.health_checker_sandbox_exists.metrics.sample()['snoozed'] == 1
     assert result == (True, None)
 
   def test_maybe_update_failure_count(self):
-    assert self.threaded_health_checker.current_consecutive_failures == 0
-    assert self.threaded_health_checker.healthy is True
+    hc = self.health_checker.threaded_health_checker
+
+    assert hc.current_consecutive_failures == 0
+    assert hc.healthy is True
 
-    self.threaded_health_checker._maybe_update_failure_count(True, 'reason')
-    assert self.threaded_health_checker.current_consecutive_failures == 0
+    hc._maybe_update_failure_count(True, 'reason')
+    assert hc.current_consecutive_failures == 0
 
-    self.threaded_health_checker._maybe_update_failure_count(False, 'reason')
-    assert self.threaded_health_checker.current_consecutive_failures == 1
-    assert self.threaded_health_checker.healthy is True
+    hc._maybe_update_failure_count(False, 'reason')
+    assert hc.current_consecutive_failures == 1
+    assert hc.healthy is True
 
-    self.threaded_health_checker._maybe_update_failure_count(False, 'reason')
-    assert self.threaded_health_checker.current_consecutive_failures == 2
-    assert self.threaded_health_checker.healthy is True
+    hc._maybe_update_failure_count(False, 'reason')
+    assert hc.current_consecutive_failures == 2
+    assert hc.healthy is True
 
-    self.threaded_health_checker._maybe_update_failure_count(False, 'reason')
-    assert self.threaded_health_checker.healthy is False
-    assert self.threaded_health_checker.reason == 'reason'
+    hc._maybe_update_failure_count(False, 'reason')
+    assert hc.healthy is False
+    assert hc.reason == 'reason'
 
   @mock.patch('apache.aurora.executor.common.health_checker.ThreadedHealthChecker'
       '._maybe_update_failure_count',
       spec=ThreadedHealthChecker._maybe_update_failure_count)
   def test_run(self, mock_maybe_update_failure_count):
     mock_is_set = mock.Mock(spec=threading._Event.is_set)
-    self.threaded_health_checker.dead.is_set = mock_is_set
+    self.health_checker.threaded_health_checker.dead.is_set = mock_is_set
     liveness = [False, False, True]
-    def is_set():
-      return liveness.pop(0)
-    self.threaded_health_checker.dead.is_set.side_effect = is_set
-    self.threaded_health_checker.run()
+    self.health_checker.threaded_health_checker.dead.is_set.side_effect = lambda: liveness.pop(0)
+    self.health_checker.threaded_health_checker.run()
     assert self.clock.sleep.call_count == 3
     assert mock_maybe_update_failure_count.call_count == 2
 
@@ -243,10 +283,10 @@ class TestThreadedHealthChecker(unittest.TestCase):
       spec=ExceptionalThread.start)
   def test_start(self, mock_start):
     assert mock_start.call_count == 0
-    self.threaded_health_checker.start()
-    mock_start.assert_called_once_with(self.threaded_health_checker)
+    self.health_checker.threaded_health_checker.start()
+    mock_start.assert_called_once_with(self.health_checker.threaded_health_checker)
 
   def test_stop(self):
-    assert not self.threaded_health_checker.dead.is_set()
-    self.threaded_health_checker.stop()
-    assert self.threaded_health_checker.dead.is_set()
+    assert not self.health_checker.threaded_health_checker.dead.is_set()
+    self.health_checker.threaded_health_checker.stop()
+    assert self.health_checker.threaded_health_checker.dead.is_set()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/e5de618c/src/test/python/apache/aurora/executor/test_thermos_executor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/test_thermos_executor.py b/src/test/python/apache/aurora/executor/test_thermos_executor.py
index c8fab30..8dbfb1d 100644
--- a/src/test/python/apache/aurora/executor/test_thermos_executor.py
+++ b/src/test/python/apache/aurora/executor/test_thermos_executor.py
@@ -199,10 +199,6 @@ def make_executor(
   te.launchTask(proxy_driver, task_description)
 
   te.status_manager_started.wait()
-  sampled_metrics = te.metrics.sample()
-  assert 'kill_manager.enabled' in sampled_metrics
-  for checker in te._chained_checker._status_checkers:  # hacky
-    assert ('%s.enabled' % checker.name()) in sampled_metrics
 
   while len(proxy_driver.method_calls['sendStatusUpdate']) < 2:
     time.sleep(0.1)


Mime
View raw message