aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject incubator-aurora git commit: Fix swallowed exceptions in health check test, improve gc executor tests.
Date Mon, 23 Feb 2015 23:43:14 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master 9fab0f113 -> 1449a201d


Fix swallowed exceptions in health check test, improve gc executor tests.

One of the health checkers was raising uncaught exceptions on a separate
thread.  Also makes sure that clock.converge is called right after the
thread starts.  Fixes some mixing of fake / real clocks in gc executor
tests to drop test times to <4s.

Reviewed at https://reviews.apache.org/r/31251/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1449a201
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1449a201
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1449a201

Branch: refs/heads/master
Commit: 1449a201d4505120c3b2ae9dc593fd6824346c93
Parents: 9fab0f1
Author: Brian Wickman <wickman@apache.org>
Authored: Mon Feb 23 15:43:09 2015 -0800
Committer: Brian Wickman <wickman@apache.org>
Committed: Mon Feb 23 15:43:09 2015 -0800

----------------------------------------------------------------------
 .../aurora/executor/common/health_checker.py    |  12 +-
 .../executor/common/test_health_checker.py      |  60 +++++----
 .../apache/aurora/executor/test_gc_executor.py  | 126 +++++++++++--------
 3 files changed, 121 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1449a201/src/main/python/apache/aurora/executor/common/health_checker.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/health_checker.py b/src/main/python/apache/aurora/executor/common/health_checker.py
index 0d3365d..cfc29c3 100644
--- a/src/main/python/apache/aurora/executor/common/health_checker.py
+++ b/src/main/python/apache/aurora/executor/common/health_checker.py
@@ -15,6 +15,7 @@
 import os.path
 import threading
 import time
+import traceback
 
 from mesos.interface.mesos_pb2 import TaskState
 from twitter.common import log
@@ -89,7 +90,13 @@ class ThreadedHealthChecker(ExceptionalThread):
 
     self.snoozed = False
     log.debug("Health checks enabled. Performing health check.")
-    return self.checker()
+
+    try:
+      return self.checker()
+    except Exception as e:
+      log.error('Internal error in health check:')
+      log.error(traceback.format_exc())
+      return False, 'Internal health check error: %s' % e
 
   def _maybe_update_failure_count(self, is_healthy, reason):
     if not is_healthy:
@@ -106,7 +113,8 @@ class ThreadedHealthChecker(ExceptionalThread):
 
   def run(self):
     log.debug('Health checker thread started.')
-    self.clock.sleep(self.initial_interval)
+    if self.initial_interval > 0:
+      self.clock.sleep(self.initial_interval)
     log.debug('Initial interval expired.')
     while not self.dead.is_set():
       is_healthy, reason = self._perform_check_if_not_disabled()

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1449a201/src/test/python/apache/aurora/executor/common/test_health_checker.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_health_checker.py b/src/test/python/apache/aurora/executor/common/test_health_checker.py
index 4e09d30..1b4423a 100644
--- a/src/test/python/apache/aurora/executor/common/test_health_checker.py
+++ b/src/test/python/apache/aurora/executor/common/test_health_checker.py
@@ -55,30 +55,31 @@ class TestHealthChecker(unittest.TestCase):
     self.append_health_checks(False)
     hct = HealthChecker(self._checker.health, interval_secs=5, clock=self._clock)
     hct.start()
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    assert self._clock.converge(threads=[hct.threaded_health_checker])
     self._clock.assert_waiting(hct.threaded_health_checker, 10)
     assert hct.status is None
     self._clock.tick(6)
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    assert self._clock.converge(threads=[hct.threaded_health_checker])
     assert hct.status is None
     self._clock.tick(3)
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    assert self._clock.converge(threads=[hct.threaded_health_checker])
     assert hct.status is None
     self._clock.tick(5)
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    assert self._clock.converge(threads=[hct.threaded_health_checker])
     assert hct.status.status == TaskState.Value('TASK_FAILED')
     hct.stop()
     assert self._checker.health.call_count == 1
 
   def test_initial_interval_whatev(self):
-    self.append_health_checks(False)
+    self.append_health_checks(False, 2)
     hct = HealthChecker(
         self._checker.health,
         interval_secs=5,
         initial_interval_secs=0,
         clock=self._clock)
     hct.start()
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=5)
     assert hct.status.status == TaskState.Value('TASK_FAILED')
     hct.stop()
     # this is an implementation detail -- we healthcheck in the initializer and
@@ -100,32 +101,40 @@ class TestHealthChecker(unittest.TestCase):
         max_consecutive_failures=2,
         clock=self._clock)
     hct.start()
+    self._clock.converge(threads=[hct.threaded_health_checker])
 
     # 2 consecutive health check failures followed by a successful health check.
-    self._clock.tick(initial_interval_secs)
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    epsilon = 0.001
+    self._clock.tick(initial_interval_secs + epsilon)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
     assert hct.status is None
     assert hct.metrics.sample()['consecutive_failures'] == 1
-    self._clock.tick(interval_secs)
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    self._clock.tick(interval_secs + epsilon)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
     assert hct.status is None
     assert hct.metrics.sample()['consecutive_failures'] == 2
-    self._clock.tick(interval_secs)
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    self._clock.tick(interval_secs + epsilon)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
     assert hct.status is None
     assert hct.metrics.sample()['consecutive_failures'] == 0
 
     # 3 consecutive health check failures.
-    self._clock.tick(interval_secs)
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    self._clock.tick(interval_secs + epsilon)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
     assert hct.status is None
     assert hct.metrics.sample()['consecutive_failures'] == 1
-    self._clock.tick(interval_secs)
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    self._clock.tick(interval_secs + epsilon)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
     assert hct.status is None
     assert hct.metrics.sample()['consecutive_failures'] == 2
-    self._clock.tick(interval_secs)
-    assert self._clock.converge(threads=[hct.threaded_health_checker], timeout=1)
+    self._clock.tick(interval_secs + epsilon)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
     assert hct.status.status == TaskState.Value('TASK_FAILED')
     assert hct.metrics.sample()['consecutive_failures'] == 3
     hct.stop()
@@ -137,26 +146,35 @@ class TestHealthChecker(unittest.TestCase):
       return (True, None)
     hct = HealthChecker(slow_check, interval_secs=1, initial_interval_secs=1, clock=self._clock)
     hct.start()
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)
+
     assert hct._total_latency == 0
     assert hct.metrics.sample()['total_latency_secs'] == 0
 
     # start the health check (during health check it is still 0)
-    self._clock.tick(1.0)
+    epsilon = 0.001
+    self._clock.tick(1.0 + epsilon)
     self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=0.5)
     assert hct._total_latency == 0
     assert hct.metrics.sample()['total_latency_secs'] == 0
     assert hct.metrics.sample()['checks'] == 0
 
     # finish the health check
-    self._clock.tick(0.5)
+    self._clock.tick(0.5 + epsilon)
     self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)  # interval_secs
     assert hct._total_latency == 0.5
     assert hct.metrics.sample()['total_latency_secs'] == 0.5
     assert hct.metrics.sample()['checks'] == 1
 
     # tick again
-    self._clock.tick(1.5)
+    self._clock.tick(1.0 + epsilon)
+    self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.tick(0.5 + epsilon)
     self._clock.converge(threads=[hct.threaded_health_checker])
+    self._clock.assert_waiting(hct.threaded_health_checker, amount=1)  # interval_secs
     assert hct._total_latency == 1.0
     assert hct.metrics.sample()['total_latency_secs'] == 1.0
     assert hct.metrics.sample()['checks'] == 2

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1449a201/src/test/python/apache/aurora/executor/test_gc_executor.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/test_gc_executor.py b/src/test/python/apache/aurora/executor/test_gc_executor.py
index 27dee7f..3a0ff64 100644
--- a/src/test/python/apache/aurora/executor/test_gc_executor.py
+++ b/src/test/python/apache/aurora/executor/test_gc_executor.py
@@ -56,10 +56,9 @@ FINISHED_TASKS = {
 # TODO(wickman) These should be constant sets in the Thermos thrift
 THERMOS_LIVES = (TaskState.ACTIVE, TaskState.CLEANING, TaskState.FINALIZING)
 THERMOS_TERMINALS = (TaskState.SUCCESS, TaskState.FAILED, TaskState.KILLED, TaskState.LOST)
-
 STARTING_STATES = (ScheduleStatus.STARTING, ScheduleStatus.ASSIGNED)
-
 TASK_ID = 'gc_executor_task_id'
+EVENT_WAIT_TIMEOUT_SECS = 10
 
 
 if 'THERMOS_DEBUG' in os.environ:
@@ -131,7 +130,7 @@ class FakeExecutorDetector(object):
 
 
 class ThinTestThermosGCExecutor(ThermosGCExecutor):
-  POLL_WAIT = Amount(1, Time.MICROSECONDS)
+  POLL_WAIT = Amount(5, Time.MILLISECONDS)
   MINIMUM_KILL_AGE = Amount(5, Time.SECONDS)
 
   def __init__(self, checkpoint_root, active_executors=[]):
@@ -303,15 +302,16 @@ def test_real_get_states():
       assert executor.get_sandbox(task) is not None
 
 
-def wait_until_not(thing, clock=time, timeout=1.0):
+def wait_until_not(thing, timeout=EVENT_WAIT_TIMEOUT_SECS):
   """wait until something is booleany False"""
   def wait():
     while thing():
-      clock.sleep(1.0)
+      time.sleep(0.1)
   try:
     deadline(wait, timeout=timeout, daemon=True)
+    return True
   except Timeout:
-    pass
+    return False
 
 
 def run_gc_with(active_executors, retained_tasks, lose=False):
@@ -319,14 +319,17 @@ def run_gc_with(active_executors, retained_tasks, lose=False):
   with temporary_dir() as td:
     setup_tree(td, lose=lose)
     executor = ThinTestThermosGCExecutor(td, active_executors=active_executors)
-    executor.registered(proxy_driver, None, None, None)
-    executor.start()
-    art = AdjustRetainedTasks(retainedTasks=retained_tasks)
-    executor.launchTask(proxy_driver, serialize_art(art, TASK_ID))
-    wait_until_not(lambda: executor._gc_task_queue, clock=executor._clock)
-    wait_until_not(lambda: executor._task_id, clock=executor._clock)
-    assert len(executor._gc_task_queue) == 0
-    assert not executor._task_id
+    try:
+      executor.registered(proxy_driver, None, None, None)
+      executor.start()
+      art = AdjustRetainedTasks(retainedTasks=retained_tasks)
+      executor.launchTask(proxy_driver, serialize_art(art, TASK_ID))
+      assert wait_until_not(lambda: executor._gc_task_queue)
+      assert wait_until_not(lambda: executor._task_id)
+      assert len(executor._gc_task_queue) == 0
+      assert not executor._task_id
+    finally:
+      executor.shutdown(proxy_driver)
   assert len(proxy_driver.updates) >= 1
   if not lose:  # if the task is lost it will be cleaned out of band (by clean_orphans),
                 # so we don't care when the GC task actually finishes
@@ -402,26 +405,33 @@ def test_gc_killtask_noop():
   proxy_driver = ProxyDriver()
   with temporary_dir() as td:
     executor = ThinTestThermosGCExecutor(td)
-    executor.registered(proxy_driver, None, None, None)
-    executor.start()
-    executor.killTask(proxy_driver, TASK_ID)
-  assert not proxy_driver.stopped.is_set()
-  assert len(proxy_driver.updates) == 0
+    try:
+      executor.registered(proxy_driver, None, None, None)
+      executor.start()
+      executor.killTask(proxy_driver, TASK_ID)
+      assert not proxy_driver.stopped.is_set()
+      assert len(proxy_driver.updates) == 0
+    finally:
+      executor.shutdown(proxy_driver)
 
 
 def test_gc_killtask_current():
   proxy_driver = ProxyDriver()
   with temporary_dir() as td:
     executor = build_blocking_gc_executor(td, proxy_driver)
-    executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks()))
-    wait_until_not(lambda: executor._gc_task_queue, clock=executor._clock)
-    assert len(executor._gc_task_queue) == 0
-    assert executor._task_id == TASK_ID
-    executor.killTask(proxy_driver, TASK_ID)
-    assert executor._task_id == TASK_ID
-    assert len(executor._gc_task_queue) == 0
-  assert not proxy_driver.stopped.is_set()
-  assert len(proxy_driver.updates) == 0
+
+    try:
+      executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks()))
+      assert wait_until_not(lambda: len(executor._gc_task_queue))
+      assert len(executor._gc_task_queue) == 0
+      assert executor._task_id == TASK_ID
+      executor.killTask(proxy_driver, TASK_ID)
+      assert executor._task_id == TASK_ID
+      assert len(executor._gc_task_queue) == 0
+      assert not proxy_driver.stopped.is_set()
+      assert len(proxy_driver.updates) == 0
+    finally:
+      executor.shutdown(proxy_driver)
 
 
 def test_gc_killtask_queued():
@@ -429,16 +439,19 @@ def test_gc_killtask_queued():
   proxy_driver = ProxyDriver()
   with temporary_dir() as td:
     executor = build_blocking_gc_executor(td, proxy_driver)
-    executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks()))
-    thread_yield()
-    executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK2_ID))
-    thread_yield()
-    assert len(executor._gc_task_queue) == 1
-    executor.killTask(proxy_driver, TASK2_ID)
-    thread_yield()
-    assert len(executor._gc_task_queue) == 0
-  assert not proxy_driver.stopped.is_set()
-  assert len(proxy_driver.updates) == 0
+    try:
+      executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks()))
+      thread_yield()
+      executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK2_ID))
+      thread_yield()
+      assert len(executor._gc_task_queue) == 1
+      executor.killTask(proxy_driver, TASK2_ID)
+      thread_yield()
+      assert len(executor._gc_task_queue) == 0
+      assert not proxy_driver.stopped.is_set()
+      assert len(proxy_driver.updates) == 0
+    finally:
+      executor.shutdown(proxy_driver)
 
 
 def test_gc_multiple_launchtasks():
@@ -446,17 +459,21 @@ def test_gc_multiple_launchtasks():
   proxy_driver = ProxyDriver()
   with temporary_dir() as td:
     executor = build_blocking_gc_executor(td, proxy_driver)
-    executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks()))
-    thread_yield()
-    executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK2))
-    thread_yield()
-    assert len(executor._gc_task_queue) == 1
-    executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK3))
-    thread_yield()
-    assert len(executor._gc_task_queue) == 1
-  assert not proxy_driver.stopped.is_set()
-  assert len(proxy_driver.updates) >= 1
-  assert StatusUpdate(mesos_pb2.TASK_FINISHED, TASK2) in proxy_driver.updates
+
+    try:
+      executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks()))
+      thread_yield()
+      executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK2))
+      thread_yield()
+      assert len(executor._gc_task_queue) == 1
+      executor.launchTask(proxy_driver, serialize_art(AdjustRetainedTasks(), task_id=TASK3))
+      thread_yield()
+      assert len(executor._gc_task_queue) == 1
+      assert not proxy_driver.stopped.is_set()
+      assert len(proxy_driver.updates) >= 1
+      assert StatusUpdate(mesos_pb2.TASK_FINISHED, TASK2) in proxy_driver.updates
+    finally:
+      executor.shutdown(proxy_driver)
 
 
 def test_gc_shutdown():
@@ -466,9 +483,9 @@ def test_gc_shutdown():
     executor.registered(proxy_driver, None, None, None)
     executor.start()
     executor.shutdown(proxy_driver)
-    executor._stop_event.wait(timeout=1.0)
+    executor._stop_event.wait(timeout=EVENT_WAIT_TIMEOUT_SECS)
     assert executor._stop_event.is_set()
-  proxy_driver.stopped.wait(timeout=1.0)
+  proxy_driver.stopped.wait(timeout=EVENT_WAIT_TIMEOUT_SECS)
   assert proxy_driver.stopped.is_set()
   assert len(proxy_driver.updates) == 0
 
@@ -499,13 +516,14 @@ def run_gc_with_timeout(**kw):
     executor.registered(proxy_driver, None, None, None)
     executor.start()
     yield (proxy_driver, executor)
+    executor.shutdown(proxy_driver)
 
 
 def test_gc_lifetime():
   with run_gc_with_timeout(maximum_executor_lifetime=Amount(500, Time.MILLISECONDS)) as (
       proxy_driver, executor):
     executor._clock.tick(1)
-    proxy_driver.stopped.wait(timeout=1.0)
+    proxy_driver.stopped.wait(timeout=EVENT_WAIT_TIMEOUT_SECS)
     assert proxy_driver.stopped.is_set()
     assert not executor._stop_event.is_set()
 
@@ -525,7 +543,7 @@ class TestRealGC(unittest.TestCase):
     """Set up the checkpoint stream for the given task in the given checkpoint root, optionally
     finished and/or with a corrupt stream"""
     class FastTaskRunner(TaskRunner):
-      COORDINATOR_INTERVAL_SLEEP = Amount(1, Time.MICROSECONDS)
+      COORDINATOR_INTERVAL_SLEEP = Amount(1, Time.MILLISECONDS)
     tr = FastTaskRunner(
         task=task,
         checkpoint_root=root,
@@ -565,7 +583,7 @@ class TestRealGC(unittest.TestCase):
         pass
 
     class FastThermosGCExecutor(ThermosGCExecutor):
-      POLL_WAIT = Amount(1, Time.MICROSECONDS)
+      POLL_WAIT = Amount(1, Time.MILLISECONDS)
 
     detector = functools.partial(FakeExecutorDetector, task_id) if retain else FakeExecutorDetector
     executor = FastThermosGCExecutor(


Mime
View raw message