Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 1B2A1200CFD for ; Wed, 6 Sep 2017 19:49:03 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 1980D1609C5; Wed, 6 Sep 2017 17:49:03 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 9FB77160D32 for ; Wed, 6 Sep 2017 19:49:00 +0200 (CEST) Received: (qmail 96588 invoked by uid 500); 6 Sep 2017 17:48:59 -0000 Mailing-List: contact commits-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list commits@ariatosca.incubator.apache.org Received: (qmail 96579 invoked by uid 99); 6 Sep 2017 17:48:59 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Sep 2017 17:48:59 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 39346CCC5D for ; Wed, 6 Sep 2017 17:48:59 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id 9-ARi4aYYKjm for ; Wed, 6 Sep 2017 17:48:41 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id BB2E26105B for ; Wed, 6 Sep 2017 17:48:14 +0000 (UTC) Received: (qmail 93868 invoked by uid 99); 6 Sep 2017 17:48:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 06 Sep 2017 17:48:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8AF0DF56C3; Wed, 6 Sep 2017 17:48:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: arthurberezin@apache.org To: commits@ariatosca.incubator.apache.org Date: Wed, 06 Sep 2017 17:49:12 -0000 Message-Id: <29d2821c05664d128927c2fd78b348ab@git.apache.org> In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [63/70] incubator-ariatosca-website git commit: Adding Sphinx based Docs minisite archived-at: Wed, 06 Sep 2017 17:49:03 -0000 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/local.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/local.py b/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/local.py deleted file mode 100644 index 04b9ecd..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/local.py +++ /dev/null @@ -1,128 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Local execution of operations. -""" - -import os -import subprocess -import threading -import StringIO - -from . import ctx_proxy -from . import exceptions -from . import common -from . import constants -from . import environment_globals -from . import python_script_scope - - -def run_script(ctx, script_path, process, **kwargs): - if not script_path: - ctx.task.abort('Missing script_path') - process = process or {} - script_path = common.download_script(ctx, script_path) - script_func = _get_run_script_func(script_path, process) - return script_func( - ctx=ctx, - script_path=script_path, - process=process, - operation_kwargs=kwargs) - - -def _get_run_script_func(script_path, process): - if _treat_script_as_python_script(script_path, process): - return _eval_script_func - else: - if _treat_script_as_powershell_script(script_path): - process.setdefault('command_prefix', constants.DEFAULT_POWERSHELL_EXECUTABLE) - return _execute_func - - -def _treat_script_as_python_script(script_path, process): - eval_python = process.get('eval_python') - script_extension = os.path.splitext(script_path)[1].lower() - return (eval_python is True or (script_extension == constants.PYTHON_SCRIPT_FILE_EXTENSION and - eval_python is not False)) - - -def _treat_script_as_powershell_script(script_path): - script_extension = os.path.splitext(script_path)[1].lower() - return script_extension == constants.POWERSHELL_SCRIPT_FILE_EXTENSION - - -def _eval_script_func(script_path, ctx, operation_kwargs, **_): - with python_script_scope(operation_ctx=ctx, operation_inputs=operation_kwargs): - execfile(script_path, environment_globals.create_initial_globals(script_path)) - - -def _execute_func(script_path, ctx, process, operation_kwargs): - os.chmod(script_path, 0755) - process = common.create_process_config( - script_path=script_path, - process=process, - operation_kwargs=operation_kwargs) - command = process['command'] - env = os.environ.copy() - env.update(process['env']) - ctx.logger.info('Executing: {0}'.format(command)) - with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy: - env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url - running_process = subprocess.Popen( - command, - shell=True, - stdout=subprocess.PIPE, - stderr=subprocess.PIPE, - env=env, - cwd=process.get('cwd'), - bufsize=1, - close_fds=not common.is_windows()) - stdout_consumer = _OutputConsumer(running_process.stdout) - stderr_consumer = _OutputConsumer(running_process.stderr) - exit_code = running_process.wait() - stdout_consumer.join() - stderr_consumer.join() - ctx.logger.info('Execution done (exit_code={0}): {1}'.format(exit_code, command)) - - def error_check_func(): - if exit_code: - raise exceptions.ProcessException( - command=command, - exit_code=exit_code, - stdout=stdout_consumer.read_output(), - stderr=stderr_consumer.read_output()) - return common.check_error(ctx, error_check_func=error_check_func) - - -class _OutputConsumer(object): - - def __init__(self, out): - self._out = out - self._buffer = StringIO.StringIO() - self._consumer = threading.Thread(target=self._consume_output) - self._consumer.daemon = True - self._consumer.start() - - def _consume_output(self): - for line in iter(self._out.readline, b''): - self._buffer.write(line) - self._out.close() - - def read_output(self): - return self._buffer.getvalue() - - def join(self): - self._consumer.join() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/operations.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/operations.py b/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/operations.py deleted file mode 100644 index e8de545..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/operations.py +++ /dev/null @@ -1,74 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Entry point functions. -""" - -from aria.orchestrator import operation -from . import local as local_operations - - -@operation -def run_script_locally(ctx, - script_path, - process=None, - **kwargs): - return local_operations.run_script( - ctx=ctx, - script_path=script_path, - process=process, - **kwargs) - - -@operation -def run_script_with_ssh(ctx, - script_path, - fabric_env=None, - process=None, - use_sudo=False, - hide_output=None, - **kwargs): - return _try_import_ssh().run_script( - ctx=ctx, - script_path=script_path, - fabric_env=fabric_env, - process=process, - use_sudo=use_sudo, - hide_output=hide_output, - **kwargs) - - -@operation -def run_commands_with_ssh(ctx, - commands, - fabric_env=None, - use_sudo=False, - hide_output=None, - **_): - return _try_import_ssh().run_commands( - ctx=ctx, - commands=commands, - fabric_env=fabric_env, - use_sudo=use_sudo, - hide_output=hide_output) - - -def _try_import_ssh(): - try: - from .ssh import operations as ssh_operations - return ssh_operations - except Exception: - raise RuntimeError('Failed to import SSH modules; Have you installed the ARIA SSH extra?') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/ssh/operations.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/ssh/operations.py b/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/ssh/operations.py deleted file mode 100644 index c40e783..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/ssh/operations.py +++ /dev/null @@ -1,195 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Utilities for running commands remotely over SSH. -""" - -import os -import random -import string -import tempfile -import StringIO - -import fabric.api -import fabric.context_managers -import fabric.contrib.files - -from .. import constants -from .. import exceptions -from .. import common -from .. import ctx_proxy -from . import tunnel - - -_PROXY_CLIENT_PATH = ctx_proxy.client.__file__ -if _PROXY_CLIENT_PATH.endswith('.pyc'): - _PROXY_CLIENT_PATH = _PROXY_CLIENT_PATH[:-1] - - -def run_commands(ctx, commands, fabric_env, use_sudo, hide_output, **_): - """Runs the provider 'commands' in sequence - - :param commands: a list of commands to run - :param fabric_env: fabric configuration - """ - with fabric.api.settings(_hide_output(ctx, groups=hide_output), - **_fabric_env(ctx, fabric_env, warn_only=True)): - for command in commands: - ctx.logger.info('Running command: {0}'.format(command)) - run = fabric.api.sudo if use_sudo else fabric.api.run - result = run(command) - if result.failed: - raise exceptions.ProcessException( - command=result.command, - exit_code=result.return_code, - stdout=result.stdout, - stderr=result.stderr) - - -def run_script(ctx, script_path, fabric_env, process, use_sudo, hide_output, **kwargs): - process = process or {} - paths = _Paths(base_dir=process.get('base_dir', constants.DEFAULT_BASE_DIR), - local_script_path=common.download_script(ctx, script_path)) - with fabric.api.settings(_hide_output(ctx, groups=hide_output), - **_fabric_env(ctx, fabric_env, warn_only=False)): - # the remote host must have the ctx before running any fabric scripts - if not fabric.contrib.files.exists(paths.remote_ctx_path): - # there may be race conditions with other operations that - # may be running in parallel, so we pass -p to make sure - # we get 0 exit code if the directory already exists - fabric.api.run('mkdir -p {0} && mkdir -p {1}'.format(paths.remote_scripts_dir, - paths.remote_work_dir)) - # this file has to be present before using ctx - fabric.api.put(_PROXY_CLIENT_PATH, paths.remote_ctx_path) - process = common.create_process_config( - script_path=paths.remote_script_path, - process=process, - operation_kwargs=kwargs, - quote_json_env_vars=True) - fabric.api.put(paths.local_script_path, paths.remote_script_path) - with ctx_proxy.server.CtxProxy(ctx, _patch_ctx) as proxy: - local_port = proxy.port - with fabric.context_managers.cd(process.get('cwd', paths.remote_work_dir)): # pylint: disable=not-context-manager - with tunnel.remote(ctx, local_port=local_port) as remote_port: - local_socket_url = proxy.socket_url - remote_socket_url = local_socket_url.replace(str(local_port), str(remote_port)) - env_script = _write_environment_script_file( - process=process, - paths=paths, - local_socket_url=local_socket_url, - remote_socket_url=remote_socket_url) - fabric.api.put(env_script, paths.remote_env_script_path) - try: - command = 'source {0} && {1}'.format(paths.remote_env_script_path, - process['command']) - run = fabric.api.sudo if use_sudo else fabric.api.run - run(command) - except exceptions.TaskException: - return common.check_error(ctx, reraise=True) - return common.check_error(ctx) - - -def _patch_ctx(ctx): - common.patch_ctx(ctx) - original_download_resource = ctx.download_resource - original_download_resource_and_render = ctx.download_resource_and_render - - def _download_resource(func, destination, **kwargs): - handle, temp_local_path = tempfile.mkstemp() - os.close(handle) - try: - func(destination=temp_local_path, **kwargs) - return fabric.api.put(temp_local_path, destination) - finally: - os.remove(temp_local_path) - - def download_resource(destination, path=None): - _download_resource( - func=original_download_resource, - destination=destination, - path=path) - ctx.download_resource = download_resource - - def download_resource_and_render(destination, path=None, variables=None): - _download_resource( - func=original_download_resource_and_render, - destination=destination, - path=path, - variables=variables) - ctx.download_resource_and_render = download_resource_and_render - - -def _hide_output(ctx, groups): - """ Hides Fabric's output for every 'entity' in `groups` """ - groups = set(groups or []) - if not groups.issubset(constants.VALID_FABRIC_GROUPS): - ctx.task.abort('`hide_output` must be a subset of {0} (Provided: {1})' - .format(', '.join(constants.VALID_FABRIC_GROUPS), ', '.join(groups))) - return fabric.api.hide(*groups) - - -def _fabric_env(ctx, fabric_env, warn_only): - """Prepares fabric environment variables configuration""" - ctx.logger.debug('Preparing fabric environment...') - env = constants.FABRIC_ENV_DEFAULTS.copy() - env.update(fabric_env or {}) - env.setdefault('warn_only', warn_only) - # validations - if (not env.get('host_string')) and (ctx.task) and (ctx.task.actor) and (ctx.task.actor.host): - env['host_string'] = ctx.task.actor.host.host_address - if not env.get('host_string'): - ctx.task.abort('`host_string` not supplied and ip cannot be deduced automatically') - if not (env.get('password') or env.get('key_filename') or env.get('key')): - ctx.task.abort( - 'Access credentials not supplied ' - '(you must supply at least one of `key_filename`, `key` or `password`)') - if not env.get('user'): - ctx.task.abort('`user` not supplied') - ctx.logger.debug('Environment prepared successfully') - return env - - -def _write_environment_script_file(process, paths, local_socket_url, remote_socket_url): - env_script = StringIO.StringIO() - env = process['env'] - env['PATH'] = '{0}:$PATH'.format(paths.remote_ctx_dir) - env['PYTHONPATH'] = '{0}:$PYTHONPATH'.format(paths.remote_ctx_dir) - env_script.write('chmod +x {0}\n'.format(paths.remote_script_path)) - env_script.write('chmod +x {0}\n'.format(paths.remote_ctx_path)) - env.update({ - ctx_proxy.client.CTX_SOCKET_URL: remote_socket_url, - 'LOCAL_{0}'.format(ctx_proxy.client.CTX_SOCKET_URL): local_socket_url - }) - for key, value in env.iteritems(): - env_script.write('export {0}={1}\n'.format(key, value)) - return env_script - - -class _Paths(object): - - def __init__(self, base_dir, local_script_path): - self.local_script_path = local_script_path - self.remote_ctx_dir = base_dir - self.base_script_path = os.path.basename(self.local_script_path) - self.remote_ctx_path = '{0}/ctx'.format(self.remote_ctx_dir) - self.remote_scripts_dir = '{0}/scripts'.format(self.remote_ctx_dir) - self.remote_work_dir = '{0}/work'.format(self.remote_ctx_dir) - random_suffix = ''.join(random.choice(string.ascii_lowercase + string.digits) - for _ in range(8)) - remote_path_suffix = '{0}-{1}'.format(self.base_script_path, random_suffix) - self.remote_env_script_path = '{0}/env-{1}'.format(self.remote_scripts_dir, - remote_path_suffix) - self.remote_script_path = '{0}/{1}'.format(self.remote_scripts_dir, remote_path_suffix) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/ssh/tunnel.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/ssh/tunnel.py b/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/ssh/tunnel.py deleted file mode 100644 index e76d525..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/execution_plugin/ssh/tunnel.py +++ /dev/null @@ -1,107 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# This implementation was copied from the Fabric project directly: -# https://github.com/fabric/fabric/blob/master/fabric/context_managers.py#L486 -# The purpose was to remove the rtunnel creation printouts here: -# https://github.com/fabric/fabric/blob/master/fabric/context_managers.py#L547 - - -import contextlib -import select -import socket - -import fabric.api -import fabric.state -import fabric.thread_handling - - -@contextlib.contextmanager -def remote(ctx, local_port, remote_port=0, local_host='localhost', remote_bind_address='127.0.0.1'): - """Create a tunnel forwarding a locally-visible port to the remote target.""" - sockets = [] - channels = [] - thread_handlers = [] - - def accept(channel, *args, **kwargs): - # This seemingly innocent statement seems to be doing nothing - # but the truth is far from it! - # calling fileno() on a paramiko channel the first time, creates - # the required plumbing to make the channel valid for select. - # While this would generally happen implicitly inside the _forwarder - # function when select is called, it may already be too late and may - # cause the select loop to hang. - # Specifically, when new data arrives to the channel, a flag is set - # on an "event" object which is what makes the select call work. - # problem is this will only happen if the event object is not None - # and it will be not-None only after channel.fileno() has been called - # for the first time. If we wait until _forwarder calls select for the - # first time it may be after initial data has reached the channel. - # calling it explicitly here in the paramiko transport main event loop - # guarantees this will not happen. - channel.fileno() - - channels.append(channel) - sock = socket.socket() - sockets.append(sock) - - try: - sock.connect((local_host, local_port)) - except Exception as e: - try: - channel.close() - except Exception as ex2: - close_error = ' (While trying to close channel: {0})'.format(ex2) - else: - close_error = '' - ctx.task.abort('[{0}] rtunnel: cannot connect to {1}:{2} ({3}){4}' - .format(fabric.api.env.host_string, local_host, local_port, e, - close_error)) - - thread_handler = fabric.thread_handling.ThreadHandler('fwd', _forwarder, channel, sock) - thread_handlers.append(thread_handler) - - transport = fabric.state.connections[fabric.api.env.host_string].get_transport() - remote_port = transport.request_port_forward( - remote_bind_address, remote_port, handler=accept) - - try: - yield remote_port - finally: - for sock, chan, thread_handler in zip(sockets, channels, thread_handlers): - sock.close() - chan.close() - thread_handler.thread.join() - thread_handler.raise_if_needed() - transport.cancel_port_forward(remote_bind_address, remote_port) - - -def _forwarder(chan, sock): - # Bidirectionally forward data between a socket and a Paramiko channel. - while True: - read = select.select([sock, chan], [], [])[0] - if sock in read: - data = sock.recv(1024) - if len(data) == 0: - break - chan.send(data) - if chan in read: - data = chan.recv(1024) - if len(data) == 0: - break - sock.send(data) - chan.close() - sock.close() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/plugin.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/plugin.py b/apache-ariatosca-0.1.1/aria/orchestrator/plugin.py deleted file mode 100644 index 756a28e..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/plugin.py +++ /dev/null @@ -1,171 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Plugin management. -""" - -import os -import tempfile -import subprocess -import sys -import zipfile -from datetime import datetime - -import wagon - -from . import exceptions -from ..utils import process as process_utils - -_IS_WIN = os.name == 'nt' - - -class PluginManager(object): - - def __init__(self, model, plugins_dir): - """ - :param plugins_dir: root directory in which to install plugins - """ - self._model = model - self._plugins_dir = plugins_dir - - def install(self, source): - """ - Install a wagon plugin. - """ - metadata = wagon.show(source) - cls = self._model.plugin.model_cls - - os_props = metadata['build_server_os_properties'] - - plugin = cls( - name=metadata['package_name'], - archive_name=metadata['archive_name'], - supported_platform=metadata['supported_platform'], - supported_py_versions=metadata['supported_python_versions'], - distribution=os_props.get('distribution'), - distribution_release=os_props['distribution_version'], - distribution_version=os_props['distribution_release'], - package_name=metadata['package_name'], - package_version=metadata['package_version'], - package_source=metadata['package_source'], - wheels=metadata['wheels'], - uploaded_at=datetime.now() - ) - if len(self._model.plugin.list(filters={'package_name': plugin.package_name, - 'package_version': plugin.package_version})): - raise exceptions.PluginAlreadyExistsError( - 'Plugin {0}, version {1} already exists'.format(plugin.package_name, - plugin.package_version)) - self._install_wagon(source=source, prefix=self.get_plugin_dir(plugin)) - self._model.plugin.put(plugin) - return plugin - - def load_plugin(self, plugin, env=None): - """ - Load the plugin into an environment. - - Loading the plugin means the plugin's code and binaries paths will be appended to the - environment's ``PATH`` and ``PYTHONPATH``, thereby allowing usage of the plugin. - - :param plugin: plugin to load - :param env: environment to load the plugin into; If ``None``, :obj:`os.environ` will be - used - """ - env = env or os.environ - plugin_dir = self.get_plugin_dir(plugin) - - # Update PATH environment variable to include plugin's bin dir - bin_dir = 'Scripts' if _IS_WIN else 'bin' - process_utils.append_to_path(os.path.join(plugin_dir, bin_dir), env=env) - - # Update PYTHONPATH environment variable to include plugin's site-packages - # directories - if _IS_WIN: - pythonpath_dirs = [os.path.join(plugin_dir, 'Lib', 'site-packages')] - else: - # In some linux environments, there will be both a lib and a lib64 directory - # with the latter, containing compiled packages. - pythonpath_dirs = [os.path.join( - plugin_dir, 'lib{0}'.format(b), - 'python{0}.{1}'.format(sys.version_info[0], sys.version_info[1]), - 'site-packages') for b in ('', '64')] - - process_utils.append_to_pythonpath(*pythonpath_dirs, env=env) - - def get_plugin_dir(self, plugin): - return os.path.join( - self._plugins_dir, - '{0}-{1}'.format(plugin.package_name, plugin.package_version)) - - @staticmethod - def validate_plugin(source): - """ - Validate a plugin archive. - - A valid plugin is a `wagon `__ in the zip format - (suffix may also be ``.wgn``). - """ - if not zipfile.is_zipfile(source): - raise exceptions.InvalidPluginError( - 'Archive {0} is of an unsupported type. Only ' - 'zip/wgn is allowed'.format(source)) - with zipfile.ZipFile(source, 'r') as zip_file: - infos = zip_file.infolist() - try: - package_name = infos[0].filename[:infos[0].filename.index('/')] - package_json_path = "{0}/{1}".format(package_name, 'package.json') - zip_file.getinfo(package_json_path) - except (KeyError, ValueError, IndexError): - raise exceptions.InvalidPluginError( - 'Failed to validate plugin {0} ' - '(package.json was not found in archive)'.format(source)) - - def _install_wagon(self, source, prefix): - pip_freeze_output = self._pip_freeze() - file_descriptor, constraint_path = tempfile.mkstemp(prefix='constraint-', suffix='.txt') - os.close(file_descriptor) - try: - with open(constraint_path, 'wb') as constraint: - constraint.write(pip_freeze_output) - # Install the provided wagon. - # * The --prefix install_arg will cause the plugin to be installed under - # plugins_dir/{package_name}-{package_version}, So different plugins don't step on - # each other and don't interfere with the current virtualenv - # * The --constraint flag points a file containing the output of ``pip freeze``. - # It is required, to handle cases where plugins depend on some python package with - # a different version than the one installed in the current virtualenv. Without this - # flag, the existing package will be **removed** from the parent virtualenv and the - # new package will be installed under prefix. With the flag, the existing version will - # remain, and the version requested by the plugin will be ignored. - wagon.install( - source=source, - install_args='--prefix="{prefix}" --constraint="{constraint}"'.format( - prefix=prefix, - constraint=constraint.name), - venv=os.environ.get('VIRTUAL_ENV')) - finally: - os.remove(constraint_path) - - @staticmethod - def _pip_freeze(): - """Run pip freeze in current environment and return the output""" - bin_dir = 'Scripts' if os.name == 'nt' else 'bin' - pip_path = os.path.join(sys.prefix, bin_dir, - 'pip{0}'.format('.exe' if os.name == 'nt' else '')) - pip_freeze = subprocess.Popen([pip_path, 'freeze'], stdout=subprocess.PIPE) - pip_freeze_output, _ = pip_freeze.communicate() - assert not pip_freeze.poll() - return pip_freeze_output http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflow_runner.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflow_runner.py deleted file mode 100644 index df1725f..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflow_runner.py +++ /dev/null @@ -1,181 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Running workflows. -""" - -import os -import sys -from datetime import datetime - -from . import exceptions -from .context.workflow import WorkflowContext -from .workflows import builtin -from .workflows.core import engine, graph_compiler -from .workflows.executor.process import ProcessExecutor -from ..modeling import models -from ..modeling import utils as modeling_utils -from ..utils.imports import import_fullname - - -DEFAULT_TASK_MAX_ATTEMPTS = 30 -DEFAULT_TASK_RETRY_INTERVAL = 30 - - -class WorkflowRunner(object): - - def __init__(self, model_storage, resource_storage, plugin_manager, - execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None, - task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, - task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): - """ - Manages a single workflow execution on a given service. - - :param workflow_name: workflow name - :param service_id: service ID - :param inputs: key-value dict of inputs for the execution - :param model_storage: model storage API ("MAPI") - :param resource_storage: resource storage API ("RAPI") - :param plugin_manager: plugin manager - :param executor: executor for tasks; defaults to a - :class:`~aria.orchestrator.workflows.executor.process.ProcessExecutor` instance - :param task_max_attempts: maximum attempts of repeating each failing task - :param task_retry_interval: retry interval between retry attempts of a failing task - """ - - if not (execution_id or (workflow_name and service_id)): - exceptions.InvalidWorkflowRunnerParams( - "Either provide execution id in order to resume a workflow or workflow name " - "and service id with inputs") - - self._is_resume = execution_id is not None - - self._model_storage = model_storage - self._resource_storage = resource_storage - - # the IDs are stored rather than the models themselves, so this module could be used - # by several threads without raising errors on model objects shared between threads - - if self._is_resume: - self._execution_id = execution_id - self._service_id = self.execution.service.id - self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name - else: - self._service_id = service_id - self._workflow_name = workflow_name - self._validate_workflow_exists_for_service() - self._execution_id = self._create_execution_model(inputs).id - - self._workflow_context = WorkflowContext( - name=self.__class__.__name__, - model_storage=self._model_storage, - resource_storage=resource_storage, - service_id=service_id, - execution_id=self._execution_id, - workflow_name=self._workflow_name, - task_max_attempts=task_max_attempts, - task_retry_interval=task_retry_interval) - - # Set default executor and kwargs - executor = executor or ProcessExecutor(plugin_manager=plugin_manager) - - # transforming the execution inputs to dict, to pass them to the workflow function - execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values()) - - if not self._is_resume: - workflow_fn = self._get_workflow_fn() - self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__) - compiler.compile(self._tasks_graph) - - self._engine = engine.Engine(executors={executor.__class__: executor}) - - @property - def execution_id(self): - return self._execution_id - - @property - def execution(self): - return self._model_storage.execution.get(self.execution_id) - - @property - def service(self): - return self._model_storage.service.get(self._service_id) - - def execute(self): - self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume) - - def cancel(self): - self._engine.cancel_execution(ctx=self._workflow_context) - - def _create_execution_model(self, inputs): - execution = models.Execution( - created_at=datetime.utcnow(), - service=self.service, - workflow_name=self._workflow_name, - inputs={}) - - if self._workflow_name in builtin.BUILTIN_WORKFLOWS: - workflow_inputs = dict() # built-in workflows don't have any inputs - else: - workflow_inputs = self.service.workflows[self._workflow_name].inputs - - execution.inputs = modeling_utils.merge_parameter_values(inputs, - workflow_inputs, - model_cls=models.Input) - # TODO: these two following calls should execute atomically - self._validate_no_active_executions(execution) - self._model_storage.execution.put(execution) - return execution - - def _validate_workflow_exists_for_service(self): - if self._workflow_name not in self.service.workflows and \ - self._workflow_name not in builtin.BUILTIN_WORKFLOWS: - raise exceptions.UndeclaredWorkflowError( - 'No workflow policy {0} declared in service {1}' - .format(self._workflow_name, self.service.name)) - - def _validate_no_active_executions(self, execution): - active_executions = [e for e in self.service.executions if e.is_active()] - if active_executions: - raise exceptions.ActiveExecutionsError( - "Can't start execution; Service {0} has an active execution with ID {1}" - .format(self.service.name, active_executions[0].id)) - - def _get_workflow_fn(self): - if self._workflow_name in builtin.BUILTIN_WORKFLOWS: - return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, - self._workflow_name)) - - workflow = self.service.workflows[self._workflow_name] - - # TODO: Custom workflow support needs improvement, currently this code uses internal - # knowledge of the resource storage; Instead, workflows should probably be loaded - # in a similar manner to operation plugins. Also consider passing to import_fullname - # as paths instead of appending to sys path. - service_template_resources_path = os.path.join( - self._resource_storage.service_template.base_path, - str(self.service.service_template.id)) - sys.path.append(service_template_resources_path) - - try: - workflow_fn = import_fullname(workflow.function) - except ImportError: - raise exceptions.WorkflowImplementationNotFoundError( - 'Could not find workflow {0} function at {1}'.format( - self._workflow_name, workflow.function)) - - return workflow_fn http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/api/task.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/api/task.py deleted file mode 100644 index 4c518fc..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/api/task.py +++ /dev/null @@ -1,268 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Provides the tasks to be entered into the task graph -""" - -from ... import context -from ....modeling import models -from ....modeling import utils as modeling_utils -from ....utils.uuid import generate_uuid -from .. import exceptions - - -class BaseTask(object): - """ - Base class for tasks. - """ - - def __init__(self, ctx=None, **kwargs): - if ctx is not None: - self._workflow_context = ctx - else: - self._workflow_context = context.workflow.current.get() - self._id = generate_uuid(variant='uuid') - - @property - def id(self): - """ - UUID4 ID. - """ - return self._id - - @property - def workflow_context(self): - """ - Context of the current workflow. - """ - return self._workflow_context - - -class OperationTask(BaseTask): - """ - Executes an operation. - - :ivar name: formatted name (includes actor type, actor name, and interface/operation names) - :vartype name: basestring - :ivar actor: node or relationship - :vartype actor: :class:`~aria.modeling.models.Node` or - :class:`~aria.modeling.models.Relationship` - :ivar interface_name: interface name on actor - :vartype interface_name: basestring - :ivar operation_name: operation name on interface - :vartype operation_name: basestring - :ivar plugin: plugin (or None for default plugin) - :vartype plugin: :class:`~aria.modeling.models.Plugin` - :ivar function: path to Python function - :vartype function: basestring - :ivar arguments: arguments to send to Python function - :vartype arguments: {:obj:`basestring`: :class:`~aria.modeling.models.Argument`} - :ivar ignore_failure: whether to ignore failures - :vartype ignore_failure: bool - :ivar max_attempts: maximum number of attempts allowed in case of failure - :vartype max_attempts: int - :ivar retry_interval: interval between retries (in seconds) - :vartype retry_interval: float - """ - - NAME_FORMAT = '{interface}:{operation}@{type}:{name}' - - def __init__(self, - actor, - interface_name, - operation_name, - arguments=None, - ignore_failure=None, - max_attempts=None, - retry_interval=None): - """ - :param actor: node or relationship - :type actor: :class:`~aria.modeling.models.Node` or - :class:`~aria.modeling.models.Relationship` - :param interface_name: interface name on actor - :type interface_name: basestring - :param operation_name: operation name on interface - :type operation_name: basestring - :param arguments: override argument values - :type arguments: {:obj:`basestring`: object} - :param ignore_failure: override whether to ignore failures - :type ignore_failure: bool - :param max_attempts: override maximum number of attempts allowed in case of failure - :type max_attempts: int - :param retry_interval: override interval between retries (in seconds) - :type retry_interval: float - :raises ~aria.orchestrator.workflows.exceptions.OperationNotFoundException: if - ``interface_name`` and ``operation_name`` do not refer to an operation on the actor - """ - - # Creating OperationTask directly should raise an error when there is no - # interface/operation. - if not has_operation(actor, interface_name, operation_name): - raise exceptions.OperationNotFoundException( - 'Could not find operation "{operation_name}" on interface ' - '"{interface_name}" for {actor_type} "{actor.name}"'.format( - operation_name=operation_name, - interface_name=interface_name, - actor_type=type(actor).__name__.lower(), - actor=actor) - ) - - super(OperationTask, self).__init__() - - self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(), - name=actor.name, - interface=interface_name, - operation=operation_name) - self.actor = actor - self.interface_name = interface_name - self.operation_name = operation_name - self.ignore_failure = \ - self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure - self.max_attempts = max_attempts or self.workflow_context._task_max_attempts - self.retry_interval = retry_interval or self.workflow_context._task_retry_interval - - operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] - self.plugin = operation.plugin - self.function = operation.function - self.arguments = modeling_utils.merge_parameter_values(arguments, - operation.arguments, - model_cls=models.Argument) - if getattr(self.actor, 'outbound_relationships', None) is not None: - self._context_cls = context.operation.NodeOperationContext - elif getattr(self.actor, 'source_node', None) is not None: - self._context_cls = context.operation.RelationshipOperationContext - else: - raise exceptions.TaskCreationException('Could not locate valid context for ' - '{actor.__class__}'.format(actor=self.actor)) - - def __repr__(self): - return self.name - - -class StubTask(BaseTask): - """ - Enables creating empty tasks. - """ - - -class WorkflowTask(BaseTask): - """ - Executes a complete workflow. - """ - - def __init__(self, workflow_func, **kwargs): - """ - :param workflow_func: function to run - :param kwargs: kwargs that would be passed to the workflow_func - """ - super(WorkflowTask, self).__init__(**kwargs) - kwargs['ctx'] = self.workflow_context - self._graph = workflow_func(**kwargs) - - @property - def graph(self): - """ - Graph constructed by the sub workflow. - """ - return self._graph - - def __getattr__(self, item): - try: - return getattr(self._graph, item) - except AttributeError: - return super(WorkflowTask, self).__getattribute__(item) - - -def create_task(actor, interface_name, operation_name, **kwargs): - """ - Helper function that enables safe creation of :class:`OperationTask`. If the supplied interface - or operation do not exist, ``None`` is returned. - - :param actor: actor for this task - :param interface_name: name of the interface - :param operation_name: name of the operation - :param kwargs: any additional kwargs to be passed to the OperationTask - :return: OperationTask or None (if the interface/operation does not exists) - """ - try: - return OperationTask( - actor, - interface_name=interface_name, - operation_name=operation_name, - **kwargs - ) - except exceptions.OperationNotFoundException: - return None - - -def create_relationships_tasks( - node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs): - """ - Creates a relationship task (source and target) for all of a node relationships. - - :param basestring source_operation_name: relationship operation name - :param basestring interface_name: name of the interface - :param source_operation_name: - :param target_operation_name: - :param node: source node - """ - sub_tasks = [] - for relationship in node.outbound_relationships: - relationship_operations = create_relationship_tasks( - relationship, - interface_name, - source_operation_name=source_operation_name, - target_operation_name=target_operation_name, - **kwargs) - sub_tasks.append(relationship_operations) - return sub_tasks - - -def create_relationship_tasks(relationship, interface_name, source_operation_name=None, - target_operation_name=None, **kwargs): - """ - Creates a relationship task (source and target). - - :param relationship: relationship instance itself - :param source_operation_name: - :param target_operation_name: - """ - operations = [] - if source_operation_name: - operations.append( - create_task( - relationship, - interface_name=interface_name, - operation_name=source_operation_name, - **kwargs - ) - ) - if target_operation_name: - operations.append( - create_task( - relationship, - interface_name=interface_name, - operation_name=target_operation_name, - **kwargs - ) - ) - - return [o for o in operations if o] - - -def has_operation(actor, interface_name, operation_name): - interface = actor.interfaces.get(interface_name, None) - return interface and interface.operations.get(operation_name, False) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/api/task_graph.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/api/task_graph.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/api/task_graph.py deleted file mode 100644 index 900a0d1..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/api/task_graph.py +++ /dev/null @@ -1,295 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Task graph. -""" - -from collections import Iterable - -from networkx import DiGraph, topological_sort - -from ....utils.uuid import generate_uuid -from . import task as api_task - - -class TaskNotInGraphError(Exception): - """ - An error representing a scenario where a given task is not in the graph as expected. - """ - pass - - -def _filter_out_empty_tasks(func=None): - if func is None: - return lambda f: _filter_out_empty_tasks(func=f) - - def _wrapper(task, *tasks, **kwargs): - return func(*(t for t in (task,) + tuple(tasks) if t), **kwargs) - return _wrapper - - -class TaskGraph(object): - """ - Task graph builder. - """ - - def __init__(self, name): - self.name = name - self._id = generate_uuid(variant='uuid') - self._graph = DiGraph() - - def __repr__(self): - return '{name}(id={self._id}, name={self.name}, graph={self._graph!r})'.format( - name=self.__class__.__name__, self=self) - - @property - def id(self): - """ - ID of the graph - """ - return self._id - - # graph traversal methods - - @property - def tasks(self): - """ - Iterator over tasks in the graph. - """ - for _, data in self._graph.nodes_iter(data=True): - yield data['task'] - - def topological_order(self, reverse=False): - """ - Topological sort of the graph. - - :param reverse: whether to reverse the sort - :return: list which represents the topological sort - """ - for task_id in topological_sort(self._graph, reverse=reverse): - yield self.get_task(task_id) - - def get_dependencies(self, dependent_task): - """ - Iterates over the task's dependencies. - - :param dependent_task: task whose dependencies are requested - :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if - ``dependent_task`` is not in the graph - """ - if not self.has_tasks(dependent_task): - raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id)) - for _, dependency_id in self._graph.out_edges_iter(dependent_task.id): - yield self.get_task(dependency_id) - - def get_dependents(self, dependency_task): - """ - Iterates over the task's dependents. - - :param dependency_task: task whose dependents are requested - :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if - ``dependency_task`` is not in the graph - """ - if not self.has_tasks(dependency_task): - raise TaskNotInGraphError('Task id: {0}'.format(dependency_task.id)) - for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id): - yield self.get_task(dependent_id) - - # task methods - - def get_task(self, task_id): - """ - Get a task instance that's been inserted to the graph by the task's ID. - - :param basestring task_id: task ID - :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if no task found in - the graph with the given ID - """ - if not self._graph.has_node(task_id): - raise TaskNotInGraphError('Task id: {0}'.format(task_id)) - data = self._graph.node[task_id] - return data['task'] - - @_filter_out_empty_tasks - def add_tasks(self, *tasks): - """ - Adds a task to the graph. - - :param task: task - :return: list of added tasks - :rtype: list - """ - assert all([isinstance(task, (api_task.BaseTask, Iterable)) for task in tasks]) - return_tasks = [] - - for task in tasks: - if isinstance(task, Iterable): - return_tasks += self.add_tasks(*task) - elif not self.has_tasks(task): - self._graph.add_node(task.id, task=task) - return_tasks.append(task) - - return return_tasks - - @_filter_out_empty_tasks - def remove_tasks(self, *tasks): - """ - Removes the provided task from the graph. - - :param task: task - :return: list of removed tasks - :rtype: list - """ - return_tasks = [] - - for task in tasks: - if isinstance(task, Iterable): - return_tasks += self.remove_tasks(*task) - elif self.has_tasks(task): - self._graph.remove_node(task.id) - return_tasks.append(task) - - return return_tasks - - @_filter_out_empty_tasks - def has_tasks(self, *tasks): - """ - Checks whether a task is in the graph. - - :param task: task - :return: ``True`` if all tasks are in the graph, otherwise ``False`` - :rtype: list - """ - assert all(isinstance(t, (api_task.BaseTask, Iterable)) for t in tasks) - return_value = True - - for task in tasks: - if isinstance(task, Iterable): - return_value &= self.has_tasks(*task) - else: - return_value &= self._graph.has_node(task.id) - - return return_value - - def add_dependency(self, dependent, dependency): - """ - Adds a dependency for one item (task, sequence or parallel) on another. - - The dependent will only be executed after the dependency terminates. If either of the items - is either a sequence or a parallel, multiple dependencies may be added. - - :param dependent: dependent (task, sequence or parallel) - :param dependency: dependency (task, sequence or parallel) - :return: ``True`` if the dependency between the two hadn't already existed, otherwise - ``False`` - :rtype: bool - :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the - dependent or dependency are tasks which are not in the graph - """ - if not (self.has_tasks(dependent) and self.has_tasks(dependency)): - raise TaskNotInGraphError() - - if self.has_dependency(dependent, dependency): - return - - if isinstance(dependent, Iterable): - for dependent_task in dependent: - self.add_dependency(dependent_task, dependency) - else: - if isinstance(dependency, Iterable): - for dependency_task in dependency: - self.add_dependency(dependent, dependency_task) - else: - self._graph.add_edge(dependent.id, dependency.id) - - def has_dependency(self, dependent, dependency): - """ - Checks whether one item (task, sequence or parallel) depends on another. - - Note that if either of the items is either a sequence or a parallel, and some of the - dependencies exist in the graph but not all of them, this method will return ``False``. - - :param dependent: dependent (task, sequence or parallel) - :param dependency: dependency (task, sequence or parallel) - :return: ``True`` if the dependency between the two exists, otherwise ``False`` - :rtype: bool - :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the - dependent or dependency are tasks which are not in the graph - """ - if not (dependent and dependency): - return False - elif not (self.has_tasks(dependent) and self.has_tasks(dependency)): - raise TaskNotInGraphError() - - return_value = True - - if isinstance(dependent, Iterable): - for dependent_task in dependent: - return_value &= self.has_dependency(dependent_task, dependency) - else: - if isinstance(dependency, Iterable): - for dependency_task in dependency: - return_value &= self.has_dependency(dependent, dependency_task) - else: - return_value &= self._graph.has_edge(dependent.id, dependency.id) - - return return_value - - def remove_dependency(self, dependent, dependency): - """ - Removes a dependency for one item (task, sequence or parallel) on another. - - Note that if either of the items is either a sequence or a parallel, and some of the - dependencies exist in the graph but not all of them, this method will not remove any of the - dependencies and return ``False``. - - :param dependent: dependent (task, sequence or parallel) - :param dependency: dependency (task, sequence or parallel) - :return: ``False`` if the dependency between the two hadn't existed, otherwise ``True`` - :rtype: bool - :raises ~aria.orchestrator.workflows.api.task_graph.TaskNotInGraphError: if either the - dependent or dependency are tasks which are not in the graph - """ - if not (self.has_tasks(dependent) and self.has_tasks(dependency)): - raise TaskNotInGraphError() - - if not self.has_dependency(dependent, dependency): - return - - if isinstance(dependent, Iterable): - for dependent_task in dependent: - self.remove_dependency(dependent_task, dependency) - elif isinstance(dependency, Iterable): - for dependency_task in dependency: - self.remove_dependency(dependent, dependency_task) - else: - self._graph.remove_edge(dependent.id, dependency.id) - - @_filter_out_empty_tasks - def sequence(self, *tasks): - """ - Creates and inserts a sequence into the graph, effectively each task i depends on i-1. - - :param tasks: iterable of dependencies - :return: provided tasks - """ - if tasks: - self.add_tasks(*tasks) - - for i in xrange(1, len(tasks)): - self.add_dependency(tasks[i], tasks[i-1]) - - return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/execute_operation.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/execute_operation.py deleted file mode 100644 index 949f864..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/execute_operation.py +++ /dev/null @@ -1,101 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Built-in operation execution Workflow. -""" - -from ... import workflow -from ..api import task - - -@workflow -def execute_operation( - ctx, - graph, - interface_name, - operation_name, - operation_kwargs, - run_by_dependency_order, - type_names, - node_template_ids, - node_ids, - **kwargs): - """ - Built-in operation execution Workflow. - - :param workflow_context: workflow context - :param graph: graph which will describe the workflow - :param operation: operation name to execute - :param operation_kwargs: - :param run_by_dependency_order: - :param type_names: - :param node_template_ids: - :param node_ids: - :param kwargs: - :return: - """ - subgraphs = {} - # filtering node instances - filtered_nodes = list(_filter_nodes( - context=ctx, - node_template_ids=node_template_ids, - node_ids=node_ids, - type_names=type_names)) - - if run_by_dependency_order: - filtered_node_ids = set(node_instance.id for node_instance in filtered_nodes) - for node in ctx.nodes: - if node.id not in filtered_node_ids: - subgraphs[node.id] = ctx.task_graph( - name='execute_operation_stub_{0}'.format(node.id)) - - # registering actual tasks to sequences - for node in filtered_nodes: - graph.add_tasks( - task.OperationTask( - node, - interface_name=interface_name, - operation_name=operation_name, - arguments=operation_kwargs - ) - ) - - for _, node_sub_workflow in subgraphs.items(): - graph.add_tasks(node_sub_workflow) - - # adding tasks dependencies if required - if run_by_dependency_order: - for node in ctx.nodes: - for relationship in node.relationships: - graph.add_dependency( - source_task=subgraphs[node.id], after=[subgraphs[relationship.target_id]]) - - -def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()): - def _is_node_template_by_id(node_template_id): - return not node_template_ids or node_template_id in node_template_ids - - def _is_node_by_id(node_id): - return not node_ids or node_id in node_ids - - def _is_node_by_type(node_type): - return not node_type.name in type_names - - for node in context.nodes: - if all((_is_node_template_by_id(node.node_template.id), - _is_node_by_id(node.id), - _is_node_by_type(node.node_template.type))): - yield node http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/heal.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/heal.py deleted file mode 100644 index 07e27b1..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/heal.py +++ /dev/null @@ -1,179 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# pylint: skip-file - -""" -Built-in heal workflow. -""" - -from aria import workflow - -from .workflows import (install_node, uninstall_node) -from ..api import task - - -@workflow -def heal(ctx, graph, node_id): - """ - Built-in heal workflow.. - - :param ctx: workflow context - :param graph: graph which will describe the workflow. - :param node_id: ID of the node to heal - :return: - """ - failing_node = ctx.model.node.get(node_id) - host_node = ctx.model.node.get(failing_node.host.id) - failed_node_subgraph = _get_contained_subgraph(ctx, host_node) - failed_node_ids = list(n.id for n in failed_node_subgraph) - - targeted_nodes = [node for node in ctx.nodes - if node.id not in failed_node_ids] - - uninstall_subgraph = task.WorkflowTask( - heal_uninstall, - failing_nodes=failed_node_subgraph, - targeted_nodes=targeted_nodes - ) - - install_subgraph = task.WorkflowTask( - heal_install, - failing_nodes=failed_node_subgraph, - targeted_nodes=targeted_nodes) - - graph.sequence(uninstall_subgraph, install_subgraph) - - -@workflow(suffix_template='{failing_nodes}') -def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes): - """ - Uninstall phase of the heal mechanism. - - :param ctx: workflow context - :param graph: task graph to edit - :param failing_nodes: failing nodes to heal - :param targeted_nodes: targets of the relationships where the failing node are - """ - node_sub_workflows = {} - - # Create install stub workflow for each unaffected node - for node in targeted_nodes: - node_stub = task.StubTask() - node_sub_workflows[node.id] = node_stub - graph.add_tasks(node_stub) - - # create install sub workflow for every node - for node in failing_nodes: - node_sub_workflow = task.WorkflowTask(uninstall_node, - node=node) - node_sub_workflows[node.id] = node_sub_workflow - graph.add_tasks(node_sub_workflow) - - # create dependencies between the node sub workflow - for node in failing_nodes: - node_sub_workflow = node_sub_workflows[node.id] - for relationship in reversed(node.outbound_relationships): - graph.add_dependency( - node_sub_workflows[relationship.target_node.id], - node_sub_workflow) - - # Add operations for intact nodes depending on a node belonging to nodes - for node in targeted_nodes: - node_sub_workflow = node_sub_workflows[node.id] - - for relationship in reversed(node.outbound_relationships): - - target_node = \ - ctx.model.node.get(relationship.target_node.id) - target_node_subgraph = node_sub_workflows[target_node.id] - graph.add_dependency(target_node_subgraph, node_sub_workflow) - - if target_node in failing_nodes: - dependency = task.create_relationship_tasks( - relationship=relationship, - operation_name='aria.interfaces.relationship_lifecycle.unlink') - graph.add_tasks(*dependency) - graph.add_dependency(node_sub_workflow, dependency) - - -@workflow(suffix_template='{failing_nodes}') -def heal_install(ctx, graph, failing_nodes, targeted_nodes): - """ - Install phase of the heal mechanism. - - :param ctx: workflow context - :param graph: task graph to edit. - :param failing_nodes: failing nodes to heal - :param targeted_nodes: targets of the relationships where the failing node are - """ - node_sub_workflows = {} - - # Create install sub workflow for each unaffected - for node in targeted_nodes: - node_stub = task.StubTask() - node_sub_workflows[node.id] = node_stub - graph.add_tasks(node_stub) - - # create install sub workflow for every node - for node in failing_nodes: - node_sub_workflow = task.WorkflowTask(install_node, - node=node) - node_sub_workflows[node.id] = node_sub_workflow - graph.add_tasks(node_sub_workflow) - - # create dependencies between the node sub workflow - for node in failing_nodes: - node_sub_workflow = node_sub_workflows[node.id] - if node.outbound_relationships: - dependencies = \ - [node_sub_workflows[relationship.target_node.id] - for relationship in node.outbound_relationships] - graph.add_dependency(node_sub_workflow, dependencies) - - # Add operations for intact nodes depending on a node - # belonging to nodes - for node in targeted_nodes: - node_sub_workflow = node_sub_workflows[node.id] - - for relationship in node.outbound_relationships: - target_node = ctx.model.node.get( - relationship.target_node.id) - target_node_subworkflow = node_sub_workflows[target_node.id] - graph.add_dependency(node_sub_workflow, target_node_subworkflow) - - if target_node in failing_nodes: - dependent = task.create_relationship_tasks( - relationship=relationship, - operation_name='aria.interfaces.relationship_lifecycle.establish') - graph.add_tasks(*dependent) - graph.add_dependency(dependent, node_sub_workflow) - - -def _get_contained_subgraph(context, host_node): - contained_instances = [node - for node in context.nodes - if node.host_fk == host_node.id and - node.host_fk != node.id] - result = [host_node] - - if not contained_instances: - return result - - result.extend(contained_instances) - for node in contained_instances: - result.extend(_get_contained_subgraph(context, node)) - - return set(result) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/install.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/install.py deleted file mode 100644 index 1e7c531..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/install.py +++ /dev/null @@ -1,34 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Built-in install workflow. -""" - -from ... import workflow -from ..api import task as api_task -from . import workflows - - -@workflow -def install(ctx, graph): - """ - Built-in install workflow. - """ - tasks_and_nodes = [] - for node in ctx.nodes: - tasks_and_nodes.append((api_task.WorkflowTask(workflows.install_node, node=node), node)) - graph.add_tasks([task for task, _ in tasks_and_nodes]) - workflows.create_node_task_dependencies(graph, tasks_and_nodes) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/start.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/start.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/start.py deleted file mode 100644 index c02a26d..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/start.py +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Built-in start workflow. -""" - -from .workflows import start_node -from ... import workflow -from ..api import task as api_task - - -@workflow -def start(ctx, graph): - """ - Built-in start workflow. - """ - for node in ctx.model.node.iter(): - graph.add_tasks(api_task.WorkflowTask(start_node, node=node)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/stop.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/stop.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/stop.py deleted file mode 100644 index 6f9930b..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/stop.py +++ /dev/null @@ -1,31 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Built-in stop workflow. -""" - -from .workflows import stop_node -from ..api import task as api_task -from ... import workflow - - -@workflow -def stop(ctx, graph): - """ - Built-in stop workflow. - """ - for node in ctx.model.node.iter(): - graph.add_tasks(api_task.WorkflowTask(stop_node, node=node)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/uninstall.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/uninstall.py deleted file mode 100644 index 7925f4b..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/uninstall.py +++ /dev/null @@ -1,34 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Built-in uninstall workflow. -""" - -from ... import workflow -from ..api import task as api_task -from . import workflows - - -@workflow -def uninstall(ctx, graph): - """ - Built-in uninstall workflow. - """ - tasks_and_nodes = [] - for node in ctx.nodes: - tasks_and_nodes.append((api_task.WorkflowTask(workflows.uninstall_node, node=node), node)) - graph.add_tasks([task for task, _ in tasks_and_nodes]) - workflows.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca-website/blob/23d6ba76/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/workflows.py b/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/workflows.py deleted file mode 100644 index b286e98..0000000 --- a/apache-ariatosca-0.1.1/aria/orchestrator/workflows/builtin/workflows.py +++ /dev/null @@ -1,149 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -TSOCA normative lifecycle workflows. -""" - -from ... import workflow -from ..api import task - - -NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard' -NORMATIVE_CONFIGURE_INTERFACE = 'Configure' # 'tosca.interfaces.relationship.Configure' - -NORMATIVE_CREATE = 'create' -NORMATIVE_CONFIGURE = 'configure' -NORMATIVE_START = 'start' -NORMATIVE_STOP = 'stop' -NORMATIVE_DELETE = 'delete' - -NORMATIVE_PRE_CONFIGURE_SOURCE = 'pre_configure_source' -NORMATIVE_PRE_CONFIGURE_TARGET = 'pre_configure_target' -NORMATIVE_POST_CONFIGURE_SOURCE = 'post_configure_source' -NORMATIVE_POST_CONFIGURE_TARGET = 'post_configure_target' - -NORMATIVE_ADD_SOURCE = 'add_source' -NORMATIVE_ADD_TARGET = 'add_target' -NORMATIVE_REMOVE_TARGET = 'remove_target' -NORMATIVE_REMOVE_SOURCE = 'remove_source' -NORMATIVE_TARGET_CHANGED = 'target_changed' - - -__all__ = ( - 'NORMATIVE_STANDARD_INTERFACE', - 'NORMATIVE_CONFIGURE_INTERFACE', - 'NORMATIVE_CREATE', - 'NORMATIVE_START', - 'NORMATIVE_STOP', - 'NORMATIVE_DELETE', - 'NORMATIVE_CONFIGURE', - 'NORMATIVE_PRE_CONFIGURE_SOURCE', - 'NORMATIVE_PRE_CONFIGURE_TARGET', - 'NORMATIVE_POST_CONFIGURE_SOURCE', - 'NORMATIVE_POST_CONFIGURE_TARGET', - 'NORMATIVE_ADD_SOURCE', - 'NORMATIVE_ADD_TARGET', - 'NORMATIVE_REMOVE_SOURCE', - 'NORMATIVE_REMOVE_TARGET', - 'NORMATIVE_TARGET_CHANGED', - 'install_node', - 'uninstall_node', - 'start_node', - 'stop_node', -) - - -@workflow(suffix_template='{node.name}') -def install_node(graph, node, **kwargs): - # Create - sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)] - - # Configure - sequence += task.create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_PRE_CONFIGURE_SOURCE, - NORMATIVE_PRE_CONFIGURE_TARGET) - sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE)) - sequence += task.create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_POST_CONFIGURE_SOURCE, - NORMATIVE_POST_CONFIGURE_TARGET) - # Start - sequence += _create_start_tasks(node) - - graph.sequence(*sequence) - - -@workflow(suffix_template='{node.name}') -def uninstall_node(graph, node, **kwargs): - # Stop - sequence = _create_stop_tasks(node) - - # Delete - sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE)) - - graph.sequence(*sequence) - - -@workflow(suffix_template='{node.name}') -def start_node(graph, node, **kwargs): - graph.sequence(*_create_start_tasks(node)) - - -@workflow(suffix_template='{node.name}') -def stop_node(graph, node, **kwargs): - graph.sequence(*_create_stop_tasks(node)) - - -def _create_start_tasks(node): - sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)] - sequence += task.create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET) - return sequence - - -def _create_stop_tasks(node): - sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)] - sequence += task.create_relationships_tasks(node, - NORMATIVE_CONFIGURE_INTERFACE, - NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET) - return sequence - - -def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False): - """ - Creates dependencies between tasks if there is a relationship (outbound) between their nodes. - """ - - def get_task(node_name): - for api_task, task_node in tasks_and_nodes: - if task_node.name == node_name: - return api_task - return None - - for api_task, node in tasks_and_nodes: - dependencies = [] - for relationship in node.outbound_relationships: - dependency = get_task(relationship.target_node.name) - if dependency: - dependencies.append(dependency) - if dependencies: - if reverse: - for dependency in dependencies: - graph.add_dependency(dependency, api_task) - else: - graph.add_dependency(api_task, dependencies)