Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 125D7200CBA for ; Mon, 3 Jul 2017 21:56:36 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 10B1A160C03; Mon, 3 Jul 2017 19:56:36 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 0A14F160BEC for ; Mon, 3 Jul 2017 21:56:34 +0200 (CEST) Received: (qmail 94107 invoked by uid 500); 3 Jul 2017 19:56:34 -0000 Mailing-List: contact commits-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list commits@ariatosca.incubator.apache.org Received: (qmail 94098 invoked by uid 99); 3 Jul 2017 19:56:34 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Jul 2017 19:56:34 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id C9C1EC061E for ; Mon, 3 Jul 2017 19:56:33 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id VAS8bKW_HxMj for ; Mon, 3 Jul 2017 19:56:31 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 376A360D39 for ; Mon, 3 Jul 2017 19:56:25 +0000 (UTC) Received: (qmail 93429 invoked by uid 99); 3 Jul 2017 19:56:22 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 03 Jul 2017 19:56:22 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6BB54F54EB; Mon, 3 Jul 2017 19:56:19 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: emblemparade@apache.org To: commits@ariatosca.incubator.apache.org Date: Mon, 03 Jul 2017 19:56:34 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [17/31] incubator-ariatosca git commit: ARIA-285 Cancel execution may leave running processes archived-at: Mon, 03 Jul 2017 19:56:36 -0000 ARIA-285 Cancel execution may leave running processes Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/7bba3ab1 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/7bba3ab1 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/7bba3ab1 Branch: refs/heads/ARIA-260-send-interface-inputs Commit: 7bba3ab13e344425e86b14ff708030f76061d3f7 Parents: 807db30 Author: max-orlov Authored: Sun Jun 25 12:19:02 2017 +0300 Committer: max-orlov Committed: Tue Jun 27 17:18:48 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/engine.py | 10 ++++ aria/orchestrator/workflows/executor/base.py | 9 +++- aria/orchestrator/workflows/executor/celery.py | 16 +++--- aria/orchestrator/workflows/executor/process.py | 51 +++++++++++++++---- requirements.in | 1 + requirements.txt | 1 + .../orchestrator/workflows/executor/__init__.py | 7 +++ .../workflows/executor/test_process_executor.py | 52 +++++++++++++++++++- tests/requirements.txt | 1 + 9 files changed, 129 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7bba3ab1/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index d52ae85..5a94df8 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -66,13 +66,23 @@ class Engine(logger.LoggerMixin): else: time.sleep(0.1) if cancel: + self._terminate_tasks(tasks_tracker.executing_tasks) events.on_cancelled_workflow_signal.send(ctx) else: events.on_success_workflow_signal.send(ctx) except BaseException as e: + # Cleanup any remaining tasks + self._terminate_tasks(tasks_tracker.executing_tasks) events.on_failure_workflow_signal.send(ctx, exception=e) raise + def _terminate_tasks(self, tasks): + for task in tasks: + try: + self._executors[task._executor].terminate(task.id) + except BaseException: + pass + @staticmethod def cancel_execution(ctx): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7bba3ab1/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index 6a3c9d2..4cc4503 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -25,7 +25,7 @@ class BaseExecutor(logger.LoggerMixin): """ Base class for executors for running tasks """ - def _execute(self, task): + def _execute(self, ctx): raise NotImplementedError def execute(self, ctx): @@ -48,6 +48,13 @@ class BaseExecutor(logger.LoggerMixin): """ pass + def terminate(self, ctx): + """ + Terminate the executing task + :return: + """ + pass + @staticmethod def _task_started(ctx): events.start_task_signal.send(ctx) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7bba3ab1/aria/orchestrator/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py index 9d66d26..46b15fd 100644 --- a/aria/orchestrator/workflows/executor/celery.py +++ b/aria/orchestrator/workflows/executor/celery.py @@ -42,15 +42,15 @@ class CeleryExecutor(BaseExecutor): self._receiver_thread.start() self._started_queue.get(timeout=30) - def _execute(self, task): - self._tasks[task.id] = task - arguments = dict(arg.unwrapped for arg in task.arguments.values()) - arguments['ctx'] = task.context - self._results[task.id] = self._app.send_task( - task.operation_mapping, + def _execute(self, ctx): + self._tasks[ctx.id] = ctx + arguments = dict(arg.unwrapped for arg in ctx.arguments.values()) + arguments['ctx'] = ctx.context + self._results[ctx.id] = self._app.send_task( + ctx.operation_mapping, kwargs=arguments, - task_id=task.id, - queue=self._get_queue(task)) + task_id=ctx.id, + queue=self._get_queue(ctx)) def close(self): self._stopped = True http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7bba3ab1/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 8518b33..11e3cfd 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -25,6 +25,10 @@ import sys # As part of the process executor implementation, subprocess are started with this module as their # entry point. We thus remove this module's directory from the python path if it happens to be # there + +import signal +from collections import namedtuple + script_dir = os.path.dirname(__file__) if script_dir in sys.path: sys.path.remove(script_dir) @@ -39,6 +43,7 @@ import tempfile import Queue import pickle +import psutil import jsonpickle import aria @@ -57,6 +62,9 @@ UPDATE_TRACKED_CHANGES_FAILED_STR = \ 'Some changes failed writing to storage. For more info refer to the log.' +_Task = namedtuple('_Task', 'proc, ctx') + + class ProcessExecutor(base.BaseExecutor): """ Executor which runs tasks in a subprocess environment @@ -113,9 +121,26 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) + for task_id in self._tasks: + self.terminate(task_id) + + def terminate(self, task_id): + task = self._remove_task(task_id) + # The process might have managed to finish, thus it would not be in the tasks list + if task: + try: + parent_process = psutil.Process(task.proc.pid) + for child_process in reversed(parent_process.children(recursive=True)): + try: + child_process.send_signal(signal.SIGKILL) + except BaseException: + pass + parent_process.send_signal(signal.SIGKILL) + except BaseException: + pass + def _execute(self, ctx): self._check_closed() - self._tasks[ctx.task.id] = ctx # Temporary file used to pass arguments to the started subprocess file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json') @@ -125,13 +150,18 @@ class ProcessExecutor(base.BaseExecutor): env = self._construct_subprocess_env(task=ctx.task) # Asynchronously start the operation in a subprocess - subprocess.Popen( - '{0} {1} {2}'.format(sys.executable, __file__, arguments_json_path), - env=env, - shell=True) + proc = subprocess.Popen( + [ + sys.executable, + os.path.expanduser(os.path.expandvars(__file__)), + os.path.expanduser(os.path.expandvars(arguments_json_path)) + ], + env=env) + + self._tasks[ctx.task.id] = _Task(ctx=ctx, proc=proc) def _remove_task(self, task_id): - return self._tasks.pop(task_id) + return self._tasks.pop(task_id, None) def _check_closed(self): if self._stopped: @@ -191,15 +221,18 @@ class ProcessExecutor(base.BaseExecutor): _send_message(connection, response) def _handle_task_started_request(self, task_id, **kwargs): - self._task_started(self._tasks[task_id]) + self._task_started(self._tasks[task_id].ctx) def _handle_task_succeeded_request(self, task_id, **kwargs): task = self._remove_task(task_id) - self._task_succeeded(task) + if task: + self._task_succeeded(task.ctx) def _handle_task_failed_request(self, task_id, request, **kwargs): task = self._remove_task(task_id) - self._task_failed(task, exception=request['exception'], traceback=request['traceback']) + if task: + self._task_failed( + task.ctx, exception=request['exception'], traceback=request['traceback']) def _send_message(connection, message): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7bba3ab1/requirements.in ---------------------------------------------------------------------- diff --git a/requirements.in b/requirements.in index cecc9fd..723ed51 100644 --- a/requirements.in +++ b/requirements.in @@ -33,6 +33,7 @@ PrettyTable>=0.7,<0.8 click_didyoumean==0.0.3 backports.shutil_get_terminal_size==1.0.0 logutils==0.3.4.1 +psutil>=5.2.2, < 6.0.0 importlib ; python_version < '2.7' ordereddict ; python_version < '2.7' total-ordering ; python_version < '2.7' # only one version on pypi http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7bba3ab1/requirements.txt ---------------------------------------------------------------------- diff --git a/requirements.txt b/requirements.txt index 9f929a9..7ee1008 100644 --- a/requirements.txt +++ b/requirements.txt @@ -26,6 +26,7 @@ networkx==1.9.1 ordereddict==1.1 ; python_version < "2.7" packaging==16.8 # via setuptools prettytable==0.7.2 +psutil==5.2.2 pyparsing==2.2.0 # via packaging requests==2.13.0 retrying==1.3.3 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7bba3ab1/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 83584a6..99d0b39 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -18,10 +18,13 @@ from contextlib import contextmanager import aria from aria.modeling import models +from aria.orchestrator.context.common import BaseContext class MockContext(object): + INSTRUMENTATION_FIELDS = BaseContext.INSTRUMENTATION_FIELDS + def __init__(self, storage, task_kwargs=None): self.logger = logging.getLogger('mock_logger') self._task_kwargs = task_kwargs or {} @@ -46,6 +49,10 @@ class MockContext(object): def close(self): pass + @property + def model(self): + return self._storage + @classmethod def instantiate_from_dict(cls, storage_kwargs=None, task_kwargs=None): return cls(storage=aria.application_model_storage(**(storage_kwargs or {})), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7bba3ab1/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index 755b9be..6f5c827 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -14,17 +14,24 @@ # limitations under the License. import os +import time import Queue +import subprocess import pytest +import psutil +import retrying import aria +from aria import operation +from aria.modeling import models from aria.orchestrator import events from aria.utils.plugin import create as create_plugin from aria.orchestrator.workflows.executor import process import tests.storage import tests.resources +from tests.helpers import FilesystemDataHolder from tests.fixtures import ( # pylint: disable=unused-import plugins_dir, plugin_manager, @@ -71,10 +78,45 @@ class TestProcessExecutor(object): executor.execute(MockContext(model, task_kwargs=dict(function='some.function'))) assert 'closed' in exc_info.value.message + def test_process_termination(self, executor, model, fs_test_holder): + argument = models.Argument.wrap('holder_path', fs_test_holder._path) + model.argument.put(argument) + ctx = MockContext( + model, + task_kwargs=dict( + function='{0}.{1}'.format(__name__, freezing_task.__name__), + arguments=dict(holder_path=argument)), + ) + + executor.execute(ctx) + + @retrying.retry(retry_on_result=lambda r: r is False, stop_max_delay=60000, wait_fixed=500) + def wait_for_extra_process_id(): + return fs_test_holder.get('subproc', False) + + pids = [executor._tasks[ctx.task.id].proc.pid, wait_for_extra_process_id()] + assert any(p.pid == pid for p in psutil.process_iter() for pid in pids) + executor.terminate(ctx.task.id) + + # Give a chance to the processes to terminate + time.sleep(2) + assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE + for p in psutil.process_iter() + for pid in pids) + + +@pytest.fixture +def fs_test_holder(tmpdir): + dataholder_path = str(tmpdir.join('dataholder')) + holder = FilesystemDataHolder(dataholder_path) + return holder + @pytest.fixture def executor(plugin_manager): - result = process.ProcessExecutor(plugin_manager=plugin_manager) + result = process.ProcessExecutor( + plugin_manager=plugin_manager, + python_path=[tests.ROOT_DIR]) yield result result.close() @@ -92,3 +134,11 @@ def model(tmpdir): initiator_kwargs=dict(base_dir=str(tmpdir))) yield _storage tests.storage.release_sqlite_storage(_storage) + + +@operation +def freezing_task(holder_path, **_): + holder = FilesystemDataHolder(holder_path) + holder['subproc'] = subprocess.Popen('while true; do sleep 5; done', shell=True).pid + while True: + time.sleep(5) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7bba3ab1/tests/requirements.txt ---------------------------------------------------------------------- diff --git a/tests/requirements.txt b/tests/requirements.txt index 71a227a..cf57821 100644 --- a/tests/requirements.txt +++ b/tests/requirements.txt @@ -13,6 +13,7 @@ testtools fasteners==0.13.0 sh==1.12.13 +psutil==5.2.2 mock==1.0.1 pylint==1.6.4 pytest==3.0.2