aurora-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From ma...@apache.org
Subject aurora git commit: Making observer polling interval configurable.
Date Tue, 16 Jun 2015 23:26:57 GMT
Repository: aurora
Updated Branches:
  refs/heads/master ea2c9ad24 -> 8def0a90d


Making observer polling interval configurable.

Bugs closed: AURORA-1351

Reviewed at https://reviews.apache.org/r/35527/


Project: http://git-wip-us.apache.org/repos/asf/aurora/repo
Commit: http://git-wip-us.apache.org/repos/asf/aurora/commit/8def0a90
Tree: http://git-wip-us.apache.org/repos/asf/aurora/tree/8def0a90
Diff: http://git-wip-us.apache.org/repos/asf/aurora/diff/8def0a90

Branch: refs/heads/master
Commit: 8def0a90d9f402eca0a25e51f1bc1022fbce7bfc
Parents: ea2c9ad
Author: Maxim Khutornenko <maxim@apache.org>
Authored: Tue Jun 16 16:09:10 2015 -0700
Committer: Maxim Khutornenko <maxim@apache.org>
Committed: Tue Jun 16 16:09:10 2015 -0700

----------------------------------------------------------------------
 src/main/python/apache/aurora/tools/__init__.py | 15 +++++++
 .../apache/aurora/tools/thermos_observer.py     | 18 +++++++-
 .../apache/thermos/observer/task_observer.py    |  9 ++--
 src/test/python/apache/aurora/BUILD             |  1 +
 src/test/python/apache/aurora/tools/BUILD       | 27 ++++++++++++
 .../tools/test_thermos_observer_entry_point.py  | 40 +++++++++++++++++
 .../thermos/observer/test_task_observer.py      | 45 ++++++++++++++++++++
 7 files changed, 150 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/main/python/apache/aurora/tools/__init__.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/tools/__init__.py b/src/main/python/apache/aurora/tools/__init__.py
new file mode 100644
index 0000000..e2f963e
--- /dev/null
+++ b/src/main/python/apache/aurora/tools/__init__.py
@@ -0,0 +1,15 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+__import__('pkg_resources').declare_namespace(__name__)

http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/main/python/apache/aurora/tools/thermos_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/aurora/tools/thermos_observer.py b/src/main/python/apache/aurora/tools/thermos_observer.py
index 4b534d3..d88c03f 100644
--- a/src/main/python/apache/aurora/tools/thermos_observer.py
+++ b/src/main/python/apache/aurora/tools/thermos_observer.py
@@ -19,6 +19,7 @@ import time
 from twitter.common import app
 from twitter.common.exceptions import ExceptionalThread
 from twitter.common.log.options import LogOptions
+from twitter.common.quantity import Amount, Time
 
 from apache.aurora.executor.common.path_detector import MesosPathDetector
 from apache.thermos.common.constants import DEFAULT_CHECKPOINT_ROOT
@@ -50,18 +51,31 @@ app.add_option(
     help='The port on which the observer should listen.')
 
 
+app.add_option(
+    '--polling_interval_secs',
+      dest='polling_interval_secs',
+      type='int',
+      default=TaskObserver.POLLING_INTERVAL.as_(Time.SECONDS),
+      help='The number of seconds between observer refresh attempts.')
+
+
 # Allow an interruptible sleep so that ^C works.
 def sleep_forever():
   while True:
     time.sleep(1)
 
 
-def main(_, options):
+def initialize(options):
   path_detector = ChainedPathDetector(
       FixedPathDetector(options.root),
       MesosPathDetector(options.mesos_root),
   )
-  observer = TaskObserver(path_detector)
+  polling_interval = Amount(options.polling_interval_secs, Time.SECONDS)
+  return TaskObserver(path_detector, interval=polling_interval)
+
+
+def main(_, options):
+  observer = initialize(options)
   observer.start()
   root_server = configure_server(observer)
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/main/python/apache/thermos/observer/task_observer.py
----------------------------------------------------------------------
diff --git a/src/main/python/apache/thermos/observer/task_observer.py b/src/main/python/apache/thermos/observer/task_observer.py
index b9a37de..1485de8 100644
--- a/src/main/python/apache/thermos/observer/task_observer.py
+++ b/src/main/python/apache/thermos/observer/task_observer.py
@@ -21,7 +21,6 @@ polls a designated Thermos checkpoint root and collates information about
all ta
 """
 import os
 import threading
-import time
 from operator import attrgetter
 
 from twitter.common import log
@@ -54,7 +53,10 @@ class TaskObserver(ExceptionalThread, Lockable):
 
   POLLING_INTERVAL = Amount(5, Time.SECONDS)
 
-  def __init__(self, path_detector, resource_monitor_class=TaskResourceMonitor):
+  def __init__(self,
+               path_detector,
+               resource_monitor_class=TaskResourceMonitor,
+               interval=POLLING_INTERVAL):
     self._detector = ObserverTaskDetector(
         path_detector,
         self.__on_active,
@@ -63,6 +65,7 @@ class TaskObserver(ExceptionalThread, Lockable):
     if not issubclass(resource_monitor_class, ResourceMonitorBase):
       raise ValueError("resource monitor class must implement ResourceMonitorBase!")
     self._resource_monitor_class = resource_monitor_class
+    self._interval = interval
     self._active_tasks = {}    # task_id => ActiveObservedTask
     self._finished_tasks = {}  # task_id => FinishedObservedTask
     self._stop_event = threading.Event()
@@ -127,7 +130,7 @@ class TaskObserver(ExceptionalThread, Lockable):
       finished state.
     """
     while not self._stop_event.is_set():
-      time.sleep(self.POLLING_INTERVAL.as_(Time.SECONDS))
+      self._stop_event.wait(self._interval.as_(Time.SECONDS))
       with self.lock:
         self._detector.refresh()
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/test/python/apache/aurora/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/BUILD b/src/test/python/apache/aurora/BUILD
index 0701440..c2251ce 100644
--- a/src/test/python/apache/aurora/BUILD
+++ b/src/test/python/apache/aurora/BUILD
@@ -20,6 +20,7 @@ python_test_suite(
     'src/test/python/apache/aurora/common:all',
     'src/test/python/apache/aurora/config:all',
     'src/test/python/apache/aurora/executor:all',
+    'src/test/python/apache/aurora/tools:all',
   ]
 )
 

http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/test/python/apache/aurora/tools/BUILD
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/tools/BUILD b/src/test/python/apache/aurora/tools/BUILD
new file mode 100644
index 0000000..e676aff
--- /dev/null
+++ b/src/test/python/apache/aurora/tools/BUILD
@@ -0,0 +1,27 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+python_test_suite(name = 'all',
+  dependencies = [
+    ':thermos_observer_entry_point',
+  ]
+)
+
+python_tests(name = 'thermos_observer_entry_point',
+  sources = ['test_thermos_observer_entry_point.py'],
+  dependencies = [
+    '3rdparty/python:mock',
+    'src/main/python/apache/aurora/tools:thermos_observer',
+  ]
+)
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py b/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py
new file mode 100644
index 0000000..e485b81
--- /dev/null
+++ b/src/test/python/apache/aurora/tools/test_thermos_observer_entry_point.py
@@ -0,0 +1,40 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+import os
+import unittest
+
+from mock import create_autospec, Mock, patch
+from twitter.common.quantity import Amount, Time
+
+from apache.aurora.tools.thermos_observer import initialize
+from apache.thermos.observer.task_observer import TaskObserver
+
+
+class ThermosObserverMainTest(unittest.TestCase):
+  def test_initialize(self):
+    expected_interval = Amount(15, Time.SECONDS)
+    mock_options = Mock(spec_set=['root', 'mesos_root', 'polling_interval_secs'])
+    mock_options.root = ''
+    mock_options.mesos_root = os.path.abspath('.')
+    mock_options.polling_interval_secs = int(expected_interval.as_(Time.SECONDS))
+    mock_task_observer = create_autospec(spec=TaskObserver)
+    with patch(
+        'apache.aurora.tools.thermos_observer.TaskObserver',
+        return_value=mock_task_observer) as mock_observer:
+
+      initialize(mock_options)
+
+      assert len(mock_observer.mock_calls) == 1
+      args = mock_observer.mock_calls[0][2]
+      assert expected_interval == args['interval']

http://git-wip-us.apache.org/repos/asf/aurora/blob/8def0a90/src/test/python/apache/thermos/observer/test_task_observer.py
----------------------------------------------------------------------
diff --git a/src/test/python/apache/thermos/observer/test_task_observer.py b/src/test/python/apache/thermos/observer/test_task_observer.py
new file mode 100644
index 0000000..ace15c5
--- /dev/null
+++ b/src/test/python/apache/thermos/observer/test_task_observer.py
@@ -0,0 +1,45 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+import unittest
+
+from mock import create_autospec, patch
+from twitter.common.quantity import Amount, Time
+
+from apache.thermos.observer.detector import ObserverTaskDetector
+from apache.thermos.observer.task_observer import TaskObserver
+
+
+class TaskObserverTest(unittest.TestCase):
+  def test_run_loop(self):
+    """Test observer run loop."""
+    mock_task_detector = create_autospec(spec=ObserverTaskDetector)
+    with patch(
+        "apache.thermos.observer.task_observer.ObserverTaskDetector",
+        return_value=mock_task_detector) as mock_detector:
+      with patch('threading._Event.wait') as mock_wait:
+
+        run_count = 3
+        interval = 15
+        observer = TaskObserver(mock_detector, interval=Amount(interval, Time.SECONDS))
+        observer.start()
+        while len(mock_wait.mock_calls) < run_count:
+          pass
+
+        observer.stop()
+
+        assert len(mock_task_detector.mock_calls) >= run_count
+        assert len(mock_wait.mock_calls) >= run_count
+        args = mock_wait.mock_calls[1][1]
+        assert interval == args[0]


Mime
View raw message