aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject git commit: Preserve executor HealthCheckerThread name
Date Fri, 05 Sep 2014 16:33:49 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 9a33c673f -> 9a602a779


Preserve executor HealthCheckerThread name

Testing Done:

$ ./build-support/python/isort-run
$ ./build-support/python/checkstyle-check
$ ./pants ./src/test/python/apache/aurora/executor:all

Bugs closed: AURORA-682

Reviewed at https://reviews.apache.org/r/25337/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/9a602a77
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/9a602a77
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/9a602a77

Branch: refs/heads/master
Commit: 9a602a7796f3fe87265e4e5b9c813f1950377722
Parents: 9a33c67
Author: Joe Smith <yasumoto7@gmail.com>
Authored: Fri Sep 5 09:33:38 2014 -0700
Committer: Brian Wickman <wickman@apache.org>
Committed: Fri Sep 5 09:33:38 2014 -0700

----------------------------------------------------------------------
 .../aurora/executor/common/health_checker.py    | 125 ++++++++++-----
 .../python/apache/aurora/executor/common/BUILD  |  16 +-
 .../apache/aurora/executor/common/fixtures.py   |  43 ++++++
 .../executor/common/test_health_checker.py      | 151 +++++++++++++++----
 .../aurora/executor/common/test_task_info.py    |  30 +---
 5 files changed, 265 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9a602a77/src/main/python/apache/aurora/executor/common/health_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py
index 603fff3..bed9aee 100644
--- a/src/main/python/apache/aurora/executor/common/health_checker.py
+++ b/src/main/python/apache/aurora/executor/common/health_checker.py
@@ -25,8 +25,8 @@ from .status_checker import StatusChecker, StatusCheckerProvider, StatusResult
 from .task_info import mesos_task_instance_from_assigned_task, resolve_ports
 
 
-class HealthCheckerThread(StatusChecker, ExceptionalThread):
-  """Generic, StatusChecker-conforming thread for arbitrary periodic health checks
+class ThreadedHealthChecker(ExceptionalThread):
+  """Perform a health check to determine if a service is healthy or not
 
     health_checker should be a callable returning a tuple of (boolean, reason), indicating
     respectively the health of the service and the reason for its failure (or None if the
service is
@@ -34,62 +34,105 @@ class HealthCheckerThread(StatusChecker, ExceptionalThread):
   """
 
   def __init__(self,
-               health_checker,
-               interval_secs=10,
-               initial_interval_secs=None,
-               max_consecutive_failures=0,
-               clock=time):
-    self._checker = health_checker
-    self._interval = interval_secs
+      health_checker,
+      interval_secs,
+      initial_interval_secs,
+      max_consecutive_failures,
+      clock):
+    """
+    :param health_checker: health checker to confirm service health
+    :type health_checker: function that returns (boolean, <string>)
+    :param interval_secs: delay between checks
+    :type interval_secs: int
+    :param initial_interval_secs: seconds to wait before starting checks
+    :type initial_interval_secs: int
+    :param max_consecutive_failures: number of failures to allow before marking dead
+    :type max_consecutive_failures: int
+    :param clock: time module available to be mocked for testing
+    :type clock: time module
+    """
+    self.checker = health_checker
+    self.clock = clock
+    self.current_consecutive_failures = 0
+    self.dead = threading.Event()
+    self.interval = interval_secs
+    self.max_consecutive_failures = max_consecutive_failures
+
     if initial_interval_secs is not None:
-      self._initial_interval = initial_interval_secs
+      self.initial_interval = initial_interval_secs
     else:
-      self._initial_interval = interval_secs * 2
-    self._current_consecutive_failures = 0
-    self._max_consecutive_failures = max_consecutive_failures
-    self._dead = threading.Event()
-    if self._initial_interval > 0:
-      self._healthy, self._reason = True, None
+      self.initial_interval = interval_secs * 2
+
+    if self.initial_interval > 0:
+      self.healthy, self.reason = True, None
     else:
-      self._healthy, self._reason = self._checker()
-    self._clock = clock
-    super(HealthCheckerThread, self).__init__()
+      self.healthy, self.reason = self.checker()
+    super(ThreadedHealthChecker, self).__init__()
     self.daemon = True
 
-  @property
-  def status(self):
-    if not self._healthy:
-      return StatusResult('Failed health check! %s' % self._reason, TaskState.Value('TASK_FAILED'))
-
-  def run(self):
-    log.debug('Health checker thread started.')
-    self._clock.sleep(self._initial_interval)
-    log.debug('Initial interval expired.')
-    while not self._dead.is_set():
-      self._maybe_update_failure_count(*self._checker())
-      self._clock.sleep(self._interval)
-
   def _maybe_update_failure_count(self, is_healthy, reason):
     if not is_healthy:
       log.warning('Health check failure: %s' % reason)
-      self._current_consecutive_failures += 1
-      if self._current_consecutive_failures > self._max_consecutive_failures:
+      self.current_consecutive_failures += 1
+      if self.current_consecutive_failures > self.max_consecutive_failures:
         log.warning('Reached consecutive failure limit.')
-        self._healthy = False
-        self._reason = reason
+        self.healthy = False
+        self.reason = reason
     else:
-      if self._current_consecutive_failures > 0:
+      if self.current_consecutive_failures > 0:
         log.debug('Reset consecutive failures counter.')
-      self._current_consecutive_failures = 0
+      self.current_consecutive_failures = 0
+
+  def run(self):
+    log.debug('Health checker thread started.')
+    self.clock.sleep(self.initial_interval)
+    log.debug('Initial interval expired.')
+    while not self.dead.is_set():
+      is_healthy, reason = self.checker()
+      self._maybe_update_failure_count(is_healthy, reason)
+      self.clock.sleep(self.interval)
 
   def start(self):
-    StatusChecker.start(self)
     ExceptionalThread.start(self)
 
   def stop(self):
     log.debug('Health checker thread stopped.')
-    self._dead.set()
+    self.dead.set()
+
 
+class HealthChecker(StatusChecker):
+  """Generic StatusChecker-conforming class which uses a thread for arbitrary periodic health
checks
+
+    health_checker should be a callable returning a tuple of (boolean, reason), indicating
+    respectively the health of the service and the reason for its failure (or None if the
service is
+    still healthy).
+  """
+
+  def __init__(self,
+               health_checker,
+               interval_secs=10,
+               initial_interval_secs=None,
+               max_consecutive_failures=0,
+               clock=time):
+    self.threaded_health_checker = ThreadedHealthChecker(
+        health_checker,
+        interval_secs,
+        initial_interval_secs,
+        max_consecutive_failures,
+        clock)
+
+  @property
+  def status(self):
+    if not self.threaded_health_checker.healthy:
+      return StatusResult('Failed health check! %s' % self.threaded_health_checker.reason,
+          TaskState.Value('TASK_FAILED'))
+
+  def start(self):
+    super(HealthChecker, self).start()
+    self.threaded_health_checker.start()
+
+  def stop(self):
+    self.threaded_health_checker.stop()
 
 class HealthCheckerProvider(StatusCheckerProvider):
   def from_assigned_task(self, assigned_task, _):
@@ -103,7 +146,7 @@ class HealthCheckerProvider(StatusCheckerProvider):
     http_signaler = HttpSignaler(
         portmap['health'],
         timeout_secs=health_check_config.get('timeout_secs'))
-    health_checker = HealthCheckerThread(
+    health_checker = HealthChecker(
         http_signaler.health,
         interval_secs=health_check_config.get('interval_secs'),
         initial_interval_secs=health_check_config.get('initial_interval_secs'),

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9a602a77/src/test/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/BUILD b/src/test/python/apache/aurora/executor/common/BUILD
index 3229fac..7d89340 100644
--- a/src/test/python/apache/aurora/executor/common/BUILD
+++ b/src/test/python/apache/aurora/executor/common/BUILD
@@ -24,6 +24,15 @@ python_test_suite(
   ]
 )
 
+python_library(
+  name = 'fixtures',
+  sources = 'fixtures.py',
+  dependencies = [
+    pants('src/main/python/apache/aurora/config/schema'),
+    pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
+  ],
+)
+
 python_tests(
   name = 'announcer',
   sources = ['test_announcer.py'],
@@ -49,10 +58,13 @@ python_tests(
   name = 'health_checker',
   sources = ['test_health_checker.py'],
   dependencies = [
+    pants(':fixtures'),
     pants('3rdparty/python:mesos.interface'),
-    pants('3rdparty/python:mox'),
+    pants('3rdparty/python:mock'),
     pants('3rdparty/python:twitter.common.testing'),
+    pants('src/main/python/apache/aurora/common:http_signaler'),
     pants('src/main/python/apache/aurora/executor/common:health_checker'),
+    pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
   ]
 )
 
@@ -79,8 +91,8 @@ python_tests(
   name = 'task_info',
   sources = ['test_task_info.py'],
   dependencies = [
+    pants(':fixtures'),
     pants('src/main/python/apache/aurora/executor/common:task_info'),
-    pants('src/main/python/apache/aurora/config/schema'),
     pants('src/main/thrift/org/apache/aurora/gen:py-thrift'),
   ]
 )

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9a602a77/src/test/python/apache/aurora/executor/common/fixtures.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/fixtures.py b/src/test/python/apache/aurora/executor/common/fixtures.py
new file mode 100644
index 0000000..55d7358
--- /dev/null
+++ b/src/test/python/apache/aurora/executor/common/fixtures.py
@@ -0,0 +1,43 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import getpass
+
+from apache.aurora.config.schema.base import (
+    MB,
+    MesosJob,
+    MesosTaskInstance,
+    Process,
+    Resources,
+    Task
+)
+
+BASE_MTI = MesosTaskInstance(instance=0, role=getpass.getuser())
+BASE_TASK = Task(resources=Resources(cpu=1.0, ram=16 * MB, disk=32 * MB))
+
+HELLO_WORLD_TASK_ID = 'hello_world-001'
+HELLO_WORLD = BASE_TASK(
+    name='hello_world',
+    processes=[Process(name='hello_world_{{thermos.task_id}}', cmdline='echo hello world')])
+HELLO_WORLD_MTI = BASE_MTI(task=HELLO_WORLD)
+
+SLEEP60 = BASE_TASK(processes=[Process(name='sleep60', cmdline='sleep 60')])
+SLEEP2 = BASE_TASK(processes=[Process(name='sleep2', cmdline='sleep 2')])
+SLEEP60_MTI = BASE_MTI(task=SLEEP60)
+
+MESOS_JOB = MesosJob(
+  name='does_not_matter',
+  instances=1,
+  role=getpass.getuser(),
+)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9a602a77/src/test/python/apache/aurora/executor/common/test_health_checker.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py
index 490d4c8..aa36415 100644
--- a/src/test/python/apache/aurora/executor/common/test_health_checker.py
+++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py
@@ -12,14 +12,25 @@
 # limitations under the License.
 #
 
+import threading
 import time
 import unittest
 
-import mox
+import mock
 from mesos.interface.mesos_pb2 import TaskState
+from twitter.common.exceptions import ExceptionalThread
 from twitter.common.testing.clock import ThreadedClock
 
-from apache.aurora.executor.common.health_checker import HealthCheckerThread
+from apache.aurora.common.http_signaler import HttpSignaler
+from apache.aurora.config.schema.base import HealthCheckConfig
+from apache.aurora.executor.common.fixtures import HELLO_WORLD, MESOS_JOB
+from apache.aurora.executor.common.health_checker import (
+    HealthChecker,
+    HealthCheckerProvider,
+    ThreadedHealthChecker
+)
+
+from gen.apache.aurora.api.ttypes import AssignedTask, ExecutorConfig, TaskConfig
 
 
 def thread_yield():
@@ -29,23 +40,21 @@ def thread_yield():
 class TestHealthChecker(unittest.TestCase):
   def setUp(self):
     self._clock = ThreadedClock()
-    self._mox = mox.Mox()
-    self._checker = self._mox.CreateMockAnything()
-
-  def expect_health_check(self, status, num_calls=1):
-    for x in range(int(num_calls)):
-      self._checker().AndReturn((status, 'reason'))
+    self._checker = mock.Mock(spec=HttpSignaler)
 
-  def replay(self):
-    self._mox.ReplayAll()
+    self.fake_health_checks = []
+    def mock_health_check():
+      return self.fake_health_checks.pop(0)
+    self._checker.health = mock.Mock(spec=self._checker.health)
+    self._checker.health.side_effect = mock_health_check
 
-  def verify(self):
-    self._mox.VerifyAll()
+  def append_health_checks(self, status, num_calls=1):
+    for i in range(num_calls):
+      self.fake_health_checks.append((status, 'reason'))
 
   def test_initial_interval_2x(self):
-    self.expect_health_check(False)
-    self.replay()
-    hct = HealthCheckerThread(self._checker, interval_secs=5, clock=self._clock)
+    self.append_health_checks(False)
+    hct = HealthChecker(self._checker.health, interval_secs=5, clock=self._clock)
     hct.start()
     thread_yield()
     assert hct.status is None
@@ -57,31 +66,29 @@ class TestHealthChecker(unittest.TestCase):
     thread_yield()
     assert hct.status.status == TaskState.Value('TASK_FAILED')
     hct.stop()
-    self.verify()
+    assert self._checker.health.call_count == 1
 
   def test_initial_interval_whatev(self):
-    self.expect_health_check(False)
-    self.replay()
-    hct = HealthCheckerThread(
-      self._checker,
+    self.append_health_checks(False)
+    hct = HealthChecker(
+      self._checker.health,
       interval_secs=5,
       initial_interval_secs=0,
       clock=self._clock)
     hct.start()
     assert hct.status.status == TaskState.Value('TASK_FAILED')
     hct.stop()
-    self.verify()
+    assert self._checker.health.call_count == 1
 
   def test_consecutive_failures(self):
     '''Verify that a task is unhealthy only after max_consecutive_failures is exceeded'''
     initial_interval_secs = 2
     interval_secs = 1
-    self.expect_health_check(False, num_calls=2)
-    self.expect_health_check(True)
-    self.expect_health_check(False, num_calls=3)
-    self.replay()
-    hct = HealthCheckerThread(
-        self._checker,
+    self.append_health_checks(False, num_calls=2)
+    self.append_health_checks(True)
+    self.append_health_checks(False, num_calls=3)
+    hct = HealthChecker(
+        self._checker.health,
         interval_secs=interval_secs,
         initial_interval_secs=initial_interval_secs,
         max_consecutive_failures=2,
@@ -105,4 +112,92 @@ class TestHealthChecker(unittest.TestCase):
     thread_yield()
     assert hct.status.status == TaskState.Value('TASK_FAILED')
     hct.stop()
-    self.verify()
+    assert self._checker.health.call_count == 6
+
+
+class TestHealthCheckerProvider(unittest.TestCase):
+  def test_from_assigned_task(self):
+    interval_secs = 17
+    initial_interval_secs = 3
+    max_consecutive_failures = 2
+    task_config = TaskConfig(
+        executorConfig=ExecutorConfig(
+            name='thermos',
+            data=MESOS_JOB(
+                task=HELLO_WORLD,
+                health_check_config=HealthCheckConfig(
+                    interval_secs=interval_secs,
+                    initial_interval_secs=initial_interval_secs,
+                    max_consecutive_failures=max_consecutive_failures,
+                    timeout_secs=7
+                )
+            ).json_dumps()
+        )
+    )
+    assigned_task = AssignedTask(task=task_config, instanceId=1, assignedPorts={'health':
9001})
+    health_checker = HealthCheckerProvider().from_assigned_task(assigned_task, None)
+    assert health_checker.threaded_health_checker.interval == interval_secs
+    assert health_checker.threaded_health_checker.initial_interval == initial_interval_secs
+    hct_max_fail = health_checker.threaded_health_checker.max_consecutive_failures
+    assert hct_max_fail == max_consecutive_failures
+
+
+class TestThreadedHealthChecker(unittest.TestCase):
+  def setUp(self):
+    self.signaler = mock.Mock(spec=HttpSignaler)
+    self.signaler.health.return_value = (True, 'Fake')
+    self.initial_interval_secs = 1
+    self.interval_secs = 5
+    self.max_consecutive_failures = 2
+    self.clock = mock.Mock(spec=time)
+    self.threaded_health_checker = ThreadedHealthChecker(
+        self.signaler.health,
+        self.interval_secs,
+        self.initial_interval_secs,
+        self.max_consecutive_failures,
+        self.clock)
+
+  def test_maybe_update_failure_count(self):
+    assert self.threaded_health_checker.current_consecutive_failures == 0
+    assert self.threaded_health_checker.healthy is True
+
+    self.threaded_health_checker._maybe_update_failure_count(True, 'reason')
+    assert self.threaded_health_checker.current_consecutive_failures == 0
+
+    self.threaded_health_checker._maybe_update_failure_count(False, 'reason')
+    assert self.threaded_health_checker.current_consecutive_failures == 1
+    assert self.threaded_health_checker.healthy is True
+
+    self.threaded_health_checker._maybe_update_failure_count(False, 'reason')
+    assert self.threaded_health_checker.current_consecutive_failures == 2
+    assert self.threaded_health_checker.healthy is True
+
+    self.threaded_health_checker._maybe_update_failure_count(False, 'reason')
+    assert self.threaded_health_checker.healthy is False
+    assert self.threaded_health_checker.reason == 'reason'
+
+  @mock.patch('apache.aurora.executor.common.health_checker.ThreadedHealthChecker'
+      '._maybe_update_failure_count',
+      spec=ThreadedHealthChecker._maybe_update_failure_count)
+  def test_run(self, mock_maybe_update_failure_count):
+    mock_is_set = mock.Mock(spec=threading._Event.is_set)
+    self.threaded_health_checker.dead.is_set = mock_is_set
+    liveness = [False, False, True]
+    def is_set():
+      return liveness.pop(0)
+    self.threaded_health_checker.dead.is_set.side_effect = is_set
+    self.threaded_health_checker.run()
+    assert self.clock.sleep.call_count == 3
+    assert mock_maybe_update_failure_count.call_count == 2
+
+  @mock.patch('apache.aurora.executor.common.health_checker.ExceptionalThread.start',
+      spec=ExceptionalThread.start)
+  def test_start(self, mock_start):
+    assert mock_start.call_count == 0
+    self.threaded_health_checker.start()
+    mock_start.assert_called_once_with(self.threaded_health_checker)
+
+  def test_stop(self):
+    assert not self.threaded_health_checker.dead.is_set()
+    self.threaded_health_checker.stop()
+    assert self.threaded_health_checker.dead.is_set()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/9a602a77/src/test/python/apache/aurora/executor/common/test_task_info.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_task_info.py b/src/test/python/apache/aurora/executor/common/test_task_info.py
index 344fd67..c4fe905 100644
--- a/src/test/python/apache/aurora/executor/common/test_task_info.py
+++ b/src/test/python/apache/aurora/executor/common/test_task_info.py
@@ -12,39 +12,11 @@
 # limitations under the License.
 #
 
-import getpass
-
-from apache.aurora.config.schema.base import (
-    MB,
-    MesosJob,
-    MesosTaskInstance,
-    Process,
-    Resources,
-    Task
-)
+from apache.aurora.executor.common.fixtures import BASE_MTI, HELLO_WORLD, HELLO_WORLD_MTI,
MESOS_JOB
 from apache.aurora.executor.common.task_info import mesos_task_instance_from_assigned_task
 
 from gen.apache.aurora.api.ttypes import AssignedTask, ExecutorConfig, TaskConfig
 
-BASE_MTI = MesosTaskInstance(instance=0, role=getpass.getuser())
-BASE_TASK = Task(resources=Resources(cpu=1.0, ram=16 * MB, disk=32 * MB))
-
-HELLO_WORLD_TASK_ID = 'hello_world-001'
-HELLO_WORLD = BASE_TASK(
-    name='hello_world',
-    processes=[Process(name='hello_world_{{thermos.task_id}}', cmdline='echo hello world')])
-HELLO_WORLD_MTI = BASE_MTI(task=HELLO_WORLD)
-
-SLEEP60 = BASE_TASK(processes=[Process(name='sleep60', cmdline='sleep 60')])
-SLEEP2 = BASE_TASK(processes=[Process(name='sleep2', cmdline='sleep 2')])
-SLEEP60_MTI = BASE_MTI(task=SLEEP60)
-
-MESOS_JOB = MesosJob(
-  name='does_not_matter',
-  instances=1,
-  role=getpass.getuser(),
-)
-
 
 def test_deserialize_thermos_task():
   task_config = TaskConfig(


Mime
View raw message