aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From wick...@apache.org
Subject incubator-aurora git commit: De-flakify resource_manager_integration test
Date Fri, 16 Jan 2015 19:12:29 GMT
Repository: incubator-aurora
Updated Branches:
  refs/heads/master b75ed0f8f -> 1346c4f79


De-flakify resource_manager_integration test

Testing Done:
./pants goal test --no-test-pytest-fast src/test/python/apache/aurora/executor/common::

Reviewed at https://reviews.apache.org/r/29901/


Project: http://git-wip-us.apache.org/repos/asf/incubator-aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-aurora/commit/1346c4f7
Tree: http://git-wip-us.apache.org/repos/asf/incubator-aurora/tree/1346c4f7
Diff: http://git-wip-us.apache.org/repos/asf/incubator-aurora/diff/1346c4f7

Branch: refs/heads/master
Commit: 1346c4f799d39626bad001c5e398fa4786d29319
Parents: b75ed0f
Author: Brian Wickman <wickman@apache.org>
Authored: Fri Jan 16 11:11:37 2015 -0800
Committer: Brian Wickman <wickman@apache.org>
Committed: Fri Jan 16 11:11:37 2015 -0800

----------------------------------------------------------------------
 .../aurora/executor/common/resource_manager.py  | 19 ++++++++--
 .../apache/thermos/monitoring/resource.py       |  7 ++--
 .../python/apache/aurora/executor/common/BUILD  |  1 +
 .../common/test_resource_manager_integration.py | 37 ++++++++++++--------
 4 files changed, 46 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1346c4f7/src/main/python/apache/aurora/executor/common/resource_manager.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/executor/common/resource_manager.py b/src/main/python/apache/aurora/executor/common/resource_manager.py
index bf69e7e..08e02e4 100644
--- a/src/main/python/apache/aurora/executor/common/resource_manager.py
+++ b/src/main/python/apache/aurora/executor/common/resource_manager.py
@@ -12,8 +12,11 @@
 # limitations under the License.
 #
 
+import threading
+
 from mesos.interface import mesos_pb2
 from twitter.common.metrics import LambdaGauge
+from twitter.common.quantity import Amount, Time
 
 from apache.aurora.executor.common.status_checker import (
     StatusChecker,
@@ -22,6 +25,7 @@ from apache.aurora.executor.common.status_checker import (
 )
 from apache.aurora.executor.common.task_info import mesos_task_instance_from_assigned_task
 from apache.thermos.common.path import TaskPath
+from apache.thermos.monitoring.disk import DiskCollector
 from apache.thermos.monitoring.monitor import TaskMonitor
 from apache.thermos.monitoring.resource import TaskResourceMonitor
 
@@ -40,6 +44,7 @@ class ResourceManager(StatusChecker):
     self._max_ram = resources.ram().get()
     self._max_disk = resources.disk().get()
     self._kill_reason = None
+    self._kill_event = threading.Event()
 
   @property
   def _num_procs(self):
@@ -60,6 +65,7 @@ class ResourceManager(StatusChecker):
   def status(self):
     sample = self._disk_sample
     if sample > self._max_disk:
+      self._kill_event.set()
       return StatusResult('Disk limit exceeded.  Reserved %s bytes vs used %s bytes.' % (
           self._max_disk, sample), mesos_pb2.TASK_FAILED)
 
@@ -87,13 +93,22 @@ class ResourceManager(StatusChecker):
 
 
 class ResourceManagerProvider(StatusCheckerProvider):
-  def __init__(self, checkpoint_root):
+  def __init__(self,
+               checkpoint_root,
+               disk_collector=DiskCollector,
+               disk_collection_interval=Amount(1, Time.MINUTES)):
     self._checkpoint_root = checkpoint_root
+    self._disk_collector = disk_collector
+    self._disk_collection_interval = disk_collection_interval
 
   def from_assigned_task(self, assigned_task, sandbox):
     task_id = assigned_task.taskId
     resources = mesos_task_instance_from_assigned_task(assigned_task).task().resources()
     task_path = TaskPath(root=self._checkpoint_root, task_id=task_id)
     task_monitor = TaskMonitor(task_path, task_id)
-    resource_monitor = TaskResourceMonitor(task_monitor, sandbox.root)
+    resource_monitor = TaskResourceMonitor(
+        task_monitor,
+        sandbox.root,
+        disk_collector=self._disk_collector,
+        disk_collection_interval=self._disk_collection_interval)
     return ResourceManager(resources, resource_monitor)

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1346c4f7/src/main/python/apache/thermos/monitoring/resource.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/monitoring/resource.py b/src/main/python/apache/thermos/monitoring/resource.py
index 7b5bbe0..b4cb881 100644
--- a/src/main/python/apache/thermos/monitoring/resource.py
+++ b/src/main/python/apache/thermos/monitoring/resource.py
@@ -118,8 +118,11 @@ class TaskResourceMonitor(ResourceMonitorBase, threading.Thread):
 
   MAX_HISTORY = 10000  # magic number
 
-  def __init__(self, task_monitor, sandbox,
-               process_collector=ProcessTreeCollector, disk_collector=DiskCollector,
+  def __init__(self,
+               task_monitor,
+               sandbox,
+               process_collector=ProcessTreeCollector,
+               disk_collector=DiskCollector,
                process_collection_interval=Amount(20, Time.SECONDS),
                disk_collection_interval=Amount(1, Time.MINUTES),
                history_time=Amount(1, Time.HOURS)):

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1346c4f7/src/test/python/apache/aurora/executor/common/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/BUILD b/src/test/python/apache/aurora/executor/common/BUILD
index 395ced2..9e3a657 100644
--- a/src/test/python/apache/aurora/executor/common/BUILD
+++ b/src/test/python/apache/aurora/executor/common/BUILD
@@ -127,6 +127,7 @@ python_tests(
   sources = [ 'test_resource_manager_integration.py' ],
   dependencies = [
     '3rdparty/python:mesos.interface',
+    '3rdparty/python:mock',
     '3rdparty/python:twitter.common.contextutil',
     '3rdparty/python:twitter.common.dirutil',
     'api/src/main/thrift/org/apache/aurora/gen:py-thrift',

http://git-wip-us.apache.org/repos/asf/incubator-aurora/blob/1346c4f7/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
b/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
index e3c766f..bb30457 100644
--- a/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
+++ b/src/test/python/apache/aurora/executor/common/test_resource_manager_integration.py
@@ -13,14 +13,16 @@
 #
 
 import os
-import time
+import threading
 
+import mock
 from mesos.interface import mesos_pb2
 from twitter.common.contextutil import temporary_dir
-from twitter.common.dirutil import safe_open
+from twitter.common.quantity import Amount, Time
 
 from apache.aurora.executor.common.resource_manager import ResourceManagerProvider
 from apache.aurora.executor.common.sandbox import DirectorySandbox
+from apache.thermos.monitoring.disk import DiskCollector
 
 
 # TODO(jcohen): There should really be a single canonical source for creating test jobs/tasks
@@ -66,7 +68,20 @@ def test_resource_manager():
     sandbox = os.path.join(td, 'sandbox')
     root = os.path.join(td, 'thermos')
 
-    rm = ResourceManagerProvider(root).from_assigned_task(
+    completed_event = threading.Event()
+    completed_event.set()
+    completed_mock = mock.PropertyMock(completed_event)
+    mock_disk_collector = mock.create_autospec(DiskCollector, spec_set=True)
+    mock_disk_collector.sample.return_value = None
+    value_mock = mock.PropertyMock(return_value=4197)
+    type(mock_disk_collector).value = value_mock
+    type(mock_disk_collector).completed_event = completed_mock
+
+    rmp = ResourceManagerProvider(
+        root,
+        disk_collector=lambda sandbox: mock_disk_collector,
+        disk_collection_interval=Amount(1, Time.SECONDS))
+    rm = rmp.from_assigned_task(
         make_assigned_task(
             make_job('some-role', 'some-env', 'some-job', 'http', portmap={'http': 80})),
         DirectorySandbox(sandbox))
@@ -75,20 +90,14 @@ def test_resource_manager():
 
     try:
       rm.start()
-
-      with safe_open(os.path.join(sandbox, 'foo.txt'), 'w') as fp:
-        fp.write(4097 * 'x')
-
-      # N.B. The sleep below makes this test inherently vulnerable to flakiness
-      # TODO(jcohen): Investigate using threading.event to avoid the need for a timeout entirely.
-      timeout = 0
-      while rm.status is None and timeout < 5:
-        timeout += 0.1
-        time.sleep(0.1)
-
+      while rm.status is None:
+        rm._kill_event.wait(timeout=1)
       result = rm.status
       assert result is not None
       assert result.reason.startswith('Disk limit exceeded')
       assert result.status == mesos_pb2.TASK_FAILED
+      assert value_mock.call_count == 1
+      assert completed_mock.call_count == 1
+      assert mock_disk_collector.sample.call_count == 1
     finally:
       rm._resource_monitor.kill()


Mime
View raw message