ariatosca-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: windows wip
Date Sun, 02 Jul 2017 15:00:27 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions caca331ee -> 20c328f36


windows wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/20c328f3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/20c328f3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/20c328f3

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: 20c328f36e0bac4187e0df191c20a17576be12ae
Parents: caca331
Author: mxmrlv <mxmrlv@gmail.com>
Authored: Sun Jul 2 17:50:36 2017 +0300
Committer: mxmrlv <mxmrlv@gmail.com>
Committed: Sun Jul 2 17:50:36 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py |  6 ++--
 .../workflows/executor/test_process_executor.py | 35 ++++++++++++--------
 2 files changed, 25 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/20c328f3/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 11e3cfd..8af700e 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -121,7 +121,7 @@ class ProcessExecutor(base.BaseExecutor):
         self._server_socket.close()
         self._listener_thread.join(timeout=60)
 
-        for task_id in self._tasks:
+        for task_id in set(self._tasks):
             self.terminate(task_id)
 
     def terminate(self, task_id):
@@ -132,10 +132,10 @@ class ProcessExecutor(base.BaseExecutor):
                 parent_process = psutil.Process(task.proc.pid)
                 for child_process in reversed(parent_process.children(recursive=True)):
                     try:
-                        child_process.send_signal(signal.SIGKILL)
+                        child_process.kill()
                     except BaseException:
                         pass
-                parent_process.send_signal(signal.SIGKILL)
+                parent_process.kill()
             except BaseException:
                 pass
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/20c328f3/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 8da0018..c8bebc7 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -23,6 +23,8 @@ import psutil
 import retrying
 
 import aria
+import sys
+
 from aria import operation
 from aria.modeling import models
 from aria.orchestrator import events
@@ -81,19 +83,30 @@ class TestProcessExecutor(object):
             executor.execute(MockContext(model, task_kwargs=dict(function='some.function')))
         assert 'closed' in exc_info.value.message
 
-    def test_process_termination(self, executor, model, fs_test_holder):
-        argument = models.Argument.wrap('holder_path', fs_test_holder._path)
-        model.argument.put(argument)
+    def test_process_termination(self, executor, model, fs_test_holder, tmpdir):
+        freeze_script_path = str(tmpdir.join('freeze_script'))
+        with open(freeze_script_path, 'w+b') as f:
+            f.write(
+                '''import time
+while True:
+    time.sleep(5)
+                '''
+            )
+        holder_path_argument = models.Argument.wrap('holder_path', fs_test_holder._path)
+        script_path_argument = models.Argument.wrap('freezing_script_path', str(tmpdir.join('freeze_script')))
+
+        model.argument.put(holder_path_argument)
+        model.argument.put(script_path_argument)
         ctx = MockContext(
             model,
             task_kwargs=dict(
                 function='{0}.{1}'.format(__name__, freezing_task.__name__),
-                arguments=dict(holder_path=argument)),
+                arguments=dict(holder_path=holder_path_argument, freezing_script_path=script_path_argument)),
         )
 
         executor.execute(ctx)
 
-        @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500)
+        @retrying.retry(retry_on_result=lambda r: r is False)
         def wait_for_extra_process_id():
             return fs_test_holder.get('subproc', False)
 
@@ -104,7 +117,7 @@ class TestProcessExecutor(object):
         executor.terminate(ctx.task.id)
 
         # Give a chance to the processes to terminate
-        time.sleep(10) # windows might require more time
+        time.sleep(2)
 
         # all processes should be either zombies or non existent
         pids = [task_pid, extra_process_pid]
@@ -112,7 +125,7 @@ class TestProcessExecutor(object):
             if pid in psutil.pids():
                 assert psutil.Process(pid).status() == psutil.STATUS_ZOMBIE
             else:
-                # just to making the test more readable
+                # making the test more readable
                 assert pid not in psutil.pids()
 
 @pytest.fixture
@@ -147,12 +160,8 @@ def model(tmpdir):
 
 
 @operation
-def freezing_task(holder_path, **_):
+def freezing_task(holder_path, freezing_script_path, **_):
     holder = FilesystemDataHolder(holder_path)
-    if IS_WINDOWS:
-        command = 'for %a in (0 1 2 3 4 5 6 7 8 9 10) do timeout 60'
-    else:
-        command = 'while true; do sleep 5; done'
-    holder['subproc'] = subprocess.Popen(command, shell=True).pid
+    holder['subproc'] = subprocess.Popen([sys.executable, freezing_script_path], shell=True).pid
     while True:
         time.sleep(5)


Mime
View raw message