Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 8D3EE200C85 for ; Tue, 30 May 2017 20:03:35 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 8BA37160BDC; Tue, 30 May 2017 18:03:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3BFF7160BB1 for ; Tue, 30 May 2017 20:03:33 +0200 (CEST) Received: (qmail 42756 invoked by uid 500); 30 May 2017 18:03:32 -0000 Mailing-List: contact dev-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list dev@ariatosca.incubator.apache.org Received: (qmail 42745 invoked by uid 99); 30 May 2017 18:03:32 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 May 2017 18:03:32 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id D2FC3180516 for ; Tue, 30 May 2017 18:03:31 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.212 X-Spam-Level: X-Spam-Status: No, score=-4.212 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, T_FILL_THIS_FORM_SHORT=0.01] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id CwMLTj9vQNvF for ; Tue, 30 May 2017 18:03:20 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D74A460D30 for ; Tue, 30 May 2017 18:03:17 +0000 (UTC) Received: (qmail 40803 invoked by uid 99); 30 May 2017 18:03:17 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 30 May 2017 18:03:17 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D2F57E02E0; Tue, 30 May 2017 18:03:16 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: emblemparade@apache.org To: dev@ariatosca.incubator.apache.org Date: Tue, 30 May 2017 18:03:20 -0000 Message-Id: <0926a82fbe51485f8e6bfe52eae631ee@git.apache.org> In-Reply-To: <299a199858c44dc495bc729ff89e61aa@git.apache.org> References: <299a199858c44dc495bc729ff89e61aa@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] incubator-ariatosca git commit: Fixes archived-at: Tue, 30 May 2017 18:03:35 -0000 Fixes * Rename implementation/inputs to function/arguments in Task API * Rename "create_parameters" to "merge_parameter_values" and improve * Change workflow "function" back to "implementation" Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/fdec01ab Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/fdec01ab Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/fdec01ab Branch: refs/heads/ARIA-149-functions-in-operation-configuration Commit: fdec01ab0f28152d1e8fb9cd8f73b6a063f556f5 Parents: 8fe7f4b Author: Tal Liron Authored: Wed May 24 14:54:07 2017 -0500 Committer: Tal Liron Committed: Tue May 30 13:03:05 2017 -0500 ---------------------------------------------------------------------- aria/cli/execution_logging.py | 4 +- aria/modeling/exceptions.py | 6 + aria/modeling/orchestration.py | 14 +-- aria/modeling/service_instance.py | 11 +- aria/modeling/service_template.py | 25 ++-- aria/modeling/utils.py | 126 ++++++++++--------- aria/orchestrator/context/operation.py | 4 +- .../execution_plugin/instantiation.py | 12 +- aria/orchestrator/workflow_runner.py | 20 +-- aria/orchestrator/workflows/api/task.py | 89 +++++++++---- .../workflows/builtin/execute_operation.py | 2 +- aria/orchestrator/workflows/core/task.py | 4 +- aria/orchestrator/workflows/events_logging.py | 8 +- aria/orchestrator/workflows/executor/base.py | 4 +- aria/orchestrator/workflows/executor/celery.py | 6 +- aria/orchestrator/workflows/executor/dry.py | 6 +- aria/orchestrator/workflows/executor/process.py | 12 +- aria/orchestrator/workflows/executor/thread.py | 6 +- aria/utils/formatting.py | 4 +- aria/utils/validation.py | 8 +- .../profiles/aria-1.0/aria-1.0.yaml | 8 +- .../simple_v1_0/modeling/__init__.py | 22 ++-- .../simple_v1_0/modeling/functions.py | 6 +- tests/modeling/test_models.py | 12 +- tests/orchestrator/context/test_operation.py | 47 ++++--- tests/orchestrator/context/test_serialize.py | 4 +- tests/orchestrator/context/test_toolbelt.py | 4 +- .../orchestrator/execution_plugin/test_local.py | 8 +- tests/orchestrator/execution_plugin/test_ssh.py | 6 +- tests/orchestrator/test_workflow_runner.py | 8 +- tests/orchestrator/workflows/api/test_task.py | 20 +-- .../orchestrator/workflows/core/test_engine.py | 40 +++--- tests/orchestrator/workflows/core/test_task.py | 4 +- .../test_task_graph_into_execution_graph.py | 4 +- .../orchestrator/workflows/executor/__init__.py | 6 +- .../workflows/executor/test_executor.py | 10 +- .../workflows/executor/test_process_executor.py | 2 +- ...process_executor_concurrent_modifications.py | 4 +- .../executor/test_process_executor_extension.py | 18 +-- .../test_process_executor_tracked_changes.py | 14 +-- .../node-cellar/node-cellar.yaml | 2 +- 41 files changed, 340 insertions(+), 280 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/cli/execution_logging.py ---------------------------------------------------------------------- diff --git a/aria/cli/execution_logging.py b/aria/cli/execution_logging.py index b23165f..b3252f0 100644 --- a/aria/cli/execution_logging.py +++ b/aria/cli/execution_logging.py @@ -105,8 +105,8 @@ def stylize_log(item, mark_pattern): # implementation if item.task: # operation task - implementation = item.task.implementation - inputs = dict(i.unwrap() for i in item.task.inputs.values()) + implementation = item.task.function + inputs = dict(arg.unwrap() for arg in item.task.arguments.values()) else: # execution task implementation = item.execution.workflow_name http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/modeling/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/modeling/exceptions.py b/aria/modeling/exceptions.py index d0e3e22..e784d1a 100644 --- a/aria/modeling/exceptions.py +++ b/aria/modeling/exceptions.py @@ -57,3 +57,9 @@ class UndeclaredParametersException(ParameterException): """ ARIA modeling exception: Undeclared parameters have been provided. """ + + +class ForbiddenParameterNamesException(ParameterException): + """ + ARIA modeling exception: Forbidden parameter names have been used. + """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index ab9d34d..97de552 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -230,10 +230,10 @@ class TaskBase(ModelMixin): :vartype relationship: :class:`Relationship` :ivar plugin: The implementing plugin (set to None for default execution plugin) :vartype plugin: :class:`Plugin` - :ivar inputs: Parameters that can be used by this task - :vartype inputs: {basestring: :class:`Parameter`} - :ivar implementation: Python path to an ``@operation`` function - :vartype implementation: basestring + :ivar function: Python path to an ``@operation`` function + :vartype function: basestring + :ivar arguments: Arguments that can be used by this task + :vartype arguments: {basestring: :class:`Parameter`} :ivar max_attempts: Maximum number of retries allowed in case of failure :vartype max_attempts: int :ivar retry_interval: Interval between retries (in seconds) @@ -300,10 +300,10 @@ class TaskBase(ModelMixin): return relationship.many_to_one(cls, 'execution') @declared_attr - def inputs(cls): - return relationship.many_to_many(cls, 'parameter', prefix='inputs', dict_key='name') + def arguments(cls): + return relationship.many_to_many(cls, 'parameter', prefix='arguments', dict_key='name') - implementation = Column(String) + function = Column(String) max_attempts = Column(Integer, default=1) retry_interval = Column(Float, default=0) ignore_failure = Column(Boolean, default=False) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/modeling/service_instance.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_instance.py b/aria/modeling/service_instance.py index 31f7212..72e2478 100644 --- a/aria/modeling/service_instance.py +++ b/aria/modeling/service_instance.py @@ -1753,17 +1753,18 @@ class OperationBase(InstanceModelMixin): if (self.implementation is None) and (self.function is None): return - if (self.plugin is None) and (self.interface is not None): - # Default to execution plugin ("interface" is None for workflow operations) + if (self.interface is not None) and (self.plugin is None) and (self.function is None): + # ("interface" is None for workflow operations, which do not currently use "plugin") + # The default (None) plugin is the execution plugin execution_plugin.instantiation.configure_operation(self) else: # In the future plugins may be able to add their own "configure_operation" hook that # can validate the configuration and otherwise create specially derived arguments. For - # now, we just send all configuration parameters as arguments + # now, we just send all configuration parameters as arguments without validation. utils.instantiate_dict(self, self.arguments, self.configuration) - # Send all inputs as extra arguments. Note that they will override existing arguments of the - # same names. + # Send all inputs as extra arguments + # Note that they will override existing arguments of the same names utils.instantiate_dict(self, self.arguments, self.inputs) @property http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/modeling/service_template.py ---------------------------------------------------------------------- diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py index b4a54ca..42e0d01 100644 --- a/aria/modeling/service_template.py +++ b/aria/modeling/service_template.py @@ -287,7 +287,7 @@ class ServiceTemplateBase(TemplateModelMixin): service_template=self) context.modeling.instance = service - service.inputs = utils.create_parameters(inputs or {}, self.inputs) + service.inputs = utils.merge_parameter_values(inputs, self.inputs) # TODO: now that we have inputs, we should scan properties and inputs and evaluate functions for plugin_specification in self.plugin_specifications.itervalues(): @@ -1883,21 +1883,10 @@ class OperationTemplateBase(TemplateModelMixin): def instantiate(self, container): from . import models - if self.plugin_specification: - if self.plugin_specification.enabled: - plugin = self.plugin_specification.plugin - function = self.function if plugin is not None else None - # "plugin" would be none if a match was not found. In that case, a validation error - # should already have been reported in ServiceTemplateBase.instantiate, so we will - # continue silently here - else: - # If the plugin is disabled, the operation should be disabled, too - plugin = None - function = None - else: - # Using the default execution plugin (plugin=None) - plugin = None - function = self.function + + plugin = self.plugin_specification.plugin \ + if (self.plugin_specification is not None) and self.plugin_specification.enabled \ + else None operation = models.Operation(name=self.name, description=deepcopy_with_locators(self.description), @@ -1906,12 +1895,14 @@ class OperationTemplateBase(TemplateModelMixin): dependencies=self.dependencies, executor=self.executor, plugin=plugin, - function=function, + function=self.function, max_attempts=self.max_attempts, retry_interval=self.retry_interval, operation_template=self) + utils.instantiate_dict(container, operation.inputs, self.inputs) utils.instantiate_dict(container, operation.configuration, self.configuration) + return operation def validate(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/modeling/utils.py ---------------------------------------------------------------------- diff --git a/aria/modeling/utils.py b/aria/modeling/utils.py index 6f4022c..ef9a53a 100644 --- a/aria/modeling/utils.py +++ b/aria/modeling/utils.py @@ -22,6 +22,7 @@ from ..parser.consumption import ConsumptionContext from ..utils.console import puts from ..utils.type import validate_value_type from ..utils.collections import OrderedDict +from ..utils.formatting import string_list_as_string class ModelJSONEncoder(JSONEncoder): @@ -52,84 +53,95 @@ class NodeTemplateContainerHolder(object): return self.container.service_template -def create_parameters(parameters, declared_parameters): +def merge_parameter_values(parameter_values, declared_parameters, forbidden_names=None): """ - Validates, merges, and wraps parameter values according to those declared by a type. + Merges parameter values according to those declared by a type. - Exceptions will be raised for validation errors: + Exceptions will be raised for validation errors. - * :class:`aria.modeling.exceptions.UndeclaredParametersException` if a key in ``parameters`` - does not exist in ``declared_parameters`` - * :class:`aria.modeling.exceptions.MissingRequiredParametersException` if a key in - ``declared_parameters`` does not exist in ``parameters`` and also has no default value - * :class:`aria.modeling.exceptions.ParametersOfWrongTypeException` if a value in ``parameters`` - does not match its type in ``declared_parameters`` - - :param parameters: Provided parameter values - :type parameters: {basestring, object} - :param declared_parameters: Declared parameters + :param parameter_values: provided parameter values or None + :type parameter_values: {basestring, object} + :param declared_parameters: declared parameters :type declared_parameters: {basestring, :class:`aria.modeling.models.Parameter`} - :return: The merged parameters + :param forbidden_names: parameters will be validated against these names + :type forbidden_names: [basestring] + :return: the merged parameters :rtype: {basestring, :class:`aria.modeling.models.Parameter`} + :raises aria.modeling.exceptions.UndeclaredParametersException: if a key in ``parameter_values`` + does not exist in ``declared_parameters`` + :raises aria.modeling.exceptions.MissingRequiredParametersException: if a key in + ``declared_parameters`` does not exist in ``parameter_values`` and also has no default + value + :raises aria.modeling.exceptions.ForbiddenParameterNamesException: if a parameter name is in + ``forbidden_names`` + :raises aria.modeling.exceptions.ParametersOfWrongTypeException: if a value in + ``parameter_values`` does not match its type in ``declared_parameters`` """ - merged_parameters = _merge_and_validate_parameters(parameters, declared_parameters) - from . import models - parameters_models = OrderedDict() - for parameter_name, parameter_value in merged_parameters.iteritems(): - parameter = models.Parameter( # pylint: disable=unexpected-keyword-arg - name=parameter_name, - type_name=declared_parameters[parameter_name].type_name, - description=declared_parameters[parameter_name].description, - value=parameter_value) - parameters_models[parameter.name] = parameter - - return parameters_models - - -def _merge_and_validate_parameters(parameters, declared_parameters): - merged_parameters = OrderedDict(parameters) - - missing_parameters = [] - wrong_type_parameters = OrderedDict() - for parameter_name, declared_parameter in declared_parameters.iteritems(): - if parameter_name not in parameters: - if declared_parameter.value is not None: - merged_parameters[parameter_name] = declared_parameter.value # apply default value - else: - missing_parameters.append(parameter_name) - else: - # Validate parameter type + + parameter_values = parameter_values or {} + + undeclared_names = list(set(parameter_values.keys()).difference(declared_parameters.keys())) + if undeclared_names: + raise exceptions.UndeclaredParametersException( + 'Undeclared parameters have been provided: {0}; Declared: {1}' + .format(string_list_as_string(undeclared_names), + string_list_as_string(declared_parameters.keys()))) + + parameters = OrderedDict() + + missing_names = [] + wrong_type_values = OrderedDict() + for declared_parameter_name, declared_parameter in declared_parameters.iteritems(): + if declared_parameter_name in parameter_values: + # Value has been provided + value = parameter_values[declared_parameter_name] + + # Validate type + type_name = declared_parameter.type_name try: - validate_value_type(parameters[parameter_name], declared_parameter.type_name) + validate_value_type(value, type_name) except ValueError: - wrong_type_parameters[parameter_name] = declared_parameter.type_name + wrong_type_values[declared_parameter_name] = type_name except RuntimeError: # TODO: This error shouldn't be raised (or caught), but right now we lack support # for custom data_types, which will raise this error. Skipping their validation. pass - if missing_parameters: + # Wrap in Parameter model + parameters[declared_parameter_name] = models.Parameter( # pylint: disable=unexpected-keyword-arg + name=declared_parameter_name, + type_name=type_name, + description=declared_parameter.description, + value=value) + elif declared_parameter.value is not None: + # Copy default value from declaration + parameters[declared_parameter_name] = declared_parameter.instantiate(None) + else: + # Required value has not been provided + missing_names.append(declared_parameter_name) + + if missing_names: raise exceptions.MissingRequiredParametersException( - 'Required parameters {0} have not been specified; Expected parameters: {1}' - .format(missing_parameters, declared_parameters.keys())) + 'Declared parameters {0} have not been provided values' + .format(string_list_as_string(missing_names))) - if wrong_type_parameters: + if forbidden_names: + used_forbidden_names = list(set(forbidden_names).intersection(parameters.keys())) + if used_forbidden_names: + raise exceptions.ForbiddenParameterNamesException( + 'Forbidden parameter names have been used: {0}' + .format(string_list_as_string(used_forbidden_names))) + + if wrong_type_values: error_message = StringIO() - for param_name, param_type in wrong_type_parameters.iteritems(): - error_message.write('Parameter "{0}" must be of type {1}{2}' + for param_name, param_type in wrong_type_values.iteritems(): + error_message.write('Parameter "{0}" is not of declared type "{1}"{2}' .format(param_name, param_type, os.linesep)) raise exceptions.ParametersOfWrongTypeException(error_message.getvalue()) - undeclared_parameters = [parameter_name for parameter_name in parameters.keys() - if parameter_name not in declared_parameters] - if undeclared_parameters: - raise exceptions.UndeclaredParametersException( - 'Undeclared parameters have been specified: {0}; Expected parameters: {1}' - .format(undeclared_parameters, declared_parameters.keys())) - - return merged_parameters + return parameters def coerce_dict_values(the_dict, report_issues=False): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 7c21351..f0ba337 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -42,8 +42,8 @@ class BaseOperationContext(common.BaseContext): self._register_logger(task_id=self.task.id, level=logger_level) def __repr__(self): - details = 'implementation={task.implementation}; ' \ - 'operation_inputs={task.inputs}'\ + details = 'function={task.function}; ' \ + 'operation_arguments={task.arguments}'\ .format(task=self.task) return '{name}({0})'.format(details, name=self.name) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/execution_plugin/instantiation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/execution_plugin/instantiation.py b/aria/orchestrator/execution_plugin/instantiation.py index 26c3913..9b5152d 100644 --- a/aria/orchestrator/execution_plugin/instantiation.py +++ b/aria/orchestrator/execution_plugin/instantiation.py @@ -17,6 +17,7 @@ from ...utils.type import full_type_name from ...utils.formatting import safe_repr +from ...utils.collections import OrderedDict from ...parser import validation from ...parser.consumption import ConsumptionContext from ...modeling.functions import Function @@ -43,7 +44,7 @@ def configure_operation(operation): # kwargs in either "run_script_locally" or "run_script_with_ssh" for key, value in operation.configuration.iteritems(): if key not in ('process', 'ssh'): - operation.arguments[key] = value.instantiate() + operation.arguments[key] = value.instantiate(None) def _configure_common(operation): @@ -133,6 +134,7 @@ def _get_process(operation): if value is None: return {} _validate_type(value, dict, 'process') + value = OrderedDict(value) for k, v in value.iteritems(): if k == 'eval_python': value[k] = _coerce_bool(v, 'process.eval_python') @@ -146,17 +148,19 @@ def _get_process(operation): _validate_type(v, dict, 'process.env') else: context = ConsumptionContext.get_thread_local() - context.validation.report('unsupported configuration: "process.{0}"'.format(k), + context.validation.report('unsupported configuration parameter: "process.{0}"' + .format(k), level=validation.Issue.BETWEEN_TYPES) return value def _get_ssh(operation): value = operation.configuration.get('ssh')._value \ - if 'process' in operation.configuration else None + if 'ssh' in operation.configuration else None if value is None: return {} _validate_type(value, dict, 'ssh') + value = OrderedDict(value) for k, v in value.iteritems(): if k == 'use_sudo': value[k] = _coerce_bool(v, 'ssh.use_sudo') @@ -176,7 +180,7 @@ def _get_ssh(operation): _validate_type(v, basestring, 'ssh.address') else: context = ConsumptionContext.get_thread_local() - context.validation.report('unsupported configuration: "ssh.{0}"'.format(k), + context.validation.report('unsupported configuration parameter: "ssh.{0}"'.format(k), level=validation.Issue.BETWEEN_TYPES) return value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 0c6321f..2d373c8 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -42,9 +42,10 @@ class WorkflowRunner(object): executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): """ - Manages a single workflow execution on a given service + Manages a single workflow execution on a given service. + :param workflow_name: Workflow name - :param service_id: Service id + :param service_id: Service ID :param inputs: A key-value dict of inputs for the execution :param model_storage: Model storage :param resource_storage: Resource storage @@ -64,7 +65,7 @@ class WorkflowRunner(object): self._validate_workflow_exists_for_service() - workflow_fn = self._get_workflow_fn() + workflow_fn = self._workflow_fn execution = self._create_execution_model(inputs) self._execution_id = execution.id @@ -119,7 +120,7 @@ class WorkflowRunner(object): else: workflow_inputs = self.service.workflows[self._workflow_name].inputs - execution.inputs = modeling_utils.create_parameters(inputs, workflow_inputs) + execution.inputs = modeling_utils.merge_parameter_values(inputs, workflow_inputs) # TODO: these two following calls should execute atomically self._validate_no_active_executions(execution) self._model_storage.execution.put(execution) @@ -136,10 +137,11 @@ class WorkflowRunner(object): active_executions = [e for e in self.service.executions if e.is_active()] if active_executions: raise exceptions.ActiveExecutionsError( - "Can't start execution; Service {0} has an active execution with id {1}" + "Can't start execution; Service {0} has an active execution with ID {1}" .format(self.service.name, active_executions[0].id)) - def _get_workflow_fn(self): + @property + def _workflow_fn(self): if self._workflow_name in builtin.BUILTIN_WORKFLOWS: return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX, self._workflow_name)) @@ -156,10 +158,10 @@ class WorkflowRunner(object): sys.path.append(service_template_resources_path) try: - workflow_fn = import_fullname(workflow.implementation) + workflow_fn = import_fullname(workflow.function) except ImportError: raise exceptions.WorkflowImplementationNotFoundError( - 'Could not find workflow {0} implementation at {1}'.format( - self._workflow_name, workflow.implementation)) + 'Could not find workflow {0} function at {1}'.format( + self._workflow_name, workflow.function)) return workflow_fn http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index aa6ac45..feacaf4 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -55,7 +55,28 @@ class BaseTask(object): class OperationTask(BaseTask): """ - Represents an operation task in the task graph + Represents an operation task in the task graph. + + :ivar name: formatted name (includes actor type, actor name, and interface/operation names) + :vartype name: basestring + :ivar actor: node or relationship + :vartype actor: :class:`aria.modeling.models.Node`|:class:`aria.modeling.models.Relationship` + :ivar interface_name: interface name on actor + :vartype interface_name: basestring + :ivar operation_name: operation name on interface + :vartype operation_name: basestring + :ivar plugin: plugin (or None for default plugin) + :vartype plugin: :class:`aria.modeling.models.Plugin` + :ivar function: path to Python function + :vartype function: basestring + :ivar arguments: arguments to send to Python function + :vartype arguments: {basestring, :class:`aria.modeling.models.Parameter`} + :ivar ignore_failure: whether to ignore failures + :vartype ignore_failure: bool + :ivar max_attempts: maximum number of attempts allowed in case of failure + :vartype max_attempts: int + :ivar retry_interval: interval between retries (in seconds) + :vartype retry_interval: int """ NAME_FORMAT = '{interface}:{operation}@{type}:{name}' @@ -64,43 +85,61 @@ class OperationTask(BaseTask): actor, interface_name, operation_name, - inputs=None, + arguments=None, + ignore_failure=None, max_attempts=None, - retry_interval=None, - ignore_failure=None): + retry_interval=None): """ - Do not call this constructor directly. Instead, use :meth:`for_node` or - :meth:`for_relationship`. + :param actor: node or relationship + :type actor: :class:`aria.modeling.models.Node`|:class:`aria.modeling.models.Relationship` + :param interface_name: interface name on actor + :type interface_name: basestring + :param operation_name: operation name on interface + :type operation_name: basestring + :param arguments: override argument values + :type arguments: {basestring, object} + :param ignore_failure: override whether to ignore failures + :type ignore_failure: bool + :param max_attempts: override maximum number of attempts allowed in case of failure + :type max_attempts: int + :param retry_interval: override interval between retries (in seconds) + :type retry_interval: int + :raises aria.orchestrator.workflows.exceptions.OperationNotFoundException: if + ``interface_name`` and ``operation_name`` to not refer to an operation on the actor """ + assert isinstance(actor, (models.Node, models.Relationship)) - super(OperationTask, self).__init__() - self.actor = actor - self.interface_name = interface_name - self.operation_name = operation_name - self.max_attempts = max_attempts or self.workflow_context._task_max_attempts - self.retry_interval = retry_interval or self.workflow_context._task_retry_interval - self.ignore_failure = \ - self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure - self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(), - name=actor.name, - interface=self.interface_name, - operation=self.operation_name) + # Creating OperationTask directly should raise an error when there is no # interface/operation. - - if not has_operation(self.actor, self.interface_name, self.operation_name): + if not has_operation(actor, interface_name, operation_name): raise exceptions.OperationNotFoundException( - 'Could not find operation "{self.operation_name}" on interface ' - '"{self.interface_name}" for {actor_type} "{actor.name}"'.format( - self=self, + 'Could not find operation "{operation_name}" on interface ' + '"{interface_name}" for {actor_type} "{actor.name}"'.format( + operation_name=operation_name, + interface_name=interface_name, actor_type=type(actor).__name__.lower(), actor=actor) ) + super(OperationTask, self).__init__() + + self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(), + name=actor.name, + interface=interface_name, + operation=operation_name) + self.actor = actor + self.interface_name = interface_name + self.operation_name = operation_name + self.ignore_failure = \ + self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure + self.max_attempts = max_attempts or self.workflow_context._task_max_attempts + self.retry_interval = retry_interval or self.workflow_context._task_retry_interval + operation = self.actor.interfaces[self.interface_name].operations[self.operation_name] self.plugin = operation.plugin - self.inputs = modeling_utils.create_parameters(inputs or {}, operation.arguments) - self.implementation = operation.function + self.function = operation.function + self.arguments = modeling_utils.merge_parameter_values(arguments, operation.arguments) def __repr__(self): return self.name http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py index 02a654a..437e584 100644 --- a/aria/orchestrator/workflows/builtin/execute_operation.py +++ b/aria/orchestrator/workflows/builtin/execute_operation.py @@ -69,7 +69,7 @@ def execute_operation( node, interface_name=interface_name, operation_name=operation_name, - inputs=operation_kwargs + arguments=operation_kwargs ) ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 0d6eb11..72d83ea 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -146,8 +146,8 @@ class OperationTask(BaseTask): # Only non-stub tasks have these fields plugin=api_task.plugin, - implementation=api_task.implementation, - inputs=api_task.inputs + function=api_task.function, + arguments=api_task.arguments ) self._workflow_context.model.task.put(task_model) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflows/events_logging.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py index 236a55f..0c93b85 100644 --- a/aria/orchestrator/workflows/events_logging.py +++ b/aria/orchestrator/workflows/events_logging.py @@ -35,12 +35,12 @@ def _get_task_name(task): @events.start_task_signal.connect def _start_task_handler(task, **kwargs): - # If the task has not implementation this is an empty task. - if task.implementation: + # If the task has no function this is an empty task. + if task.function: suffix = 'started...' logger = task.context.logger.info else: - suffix = 'has no implementation' + suffix = 'has no function' logger = task.context.logger.debug logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format( @@ -48,7 +48,7 @@ def _start_task_handler(task, **kwargs): @events.on_success_task_signal.connect def _success_task_handler(task, **kwargs): - if not task.implementation: + if not task.function: return task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful' .format(name=_get_task_name(task), task=task)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index c543278..7fece6f 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -33,10 +33,10 @@ class BaseExecutor(logger.LoggerMixin): Execute a task :param task: task to execute """ - if task.implementation: + if task.function: self._execute(task) else: - # In this case the task is missing an implementation. This task still gets to an + # In this case the task is missing a function. This task still gets to an # executor, but since there is nothing to run, we by default simply skip the execution # itself. self._task_started(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py index bbddc25..3935b07 100644 --- a/aria/orchestrator/workflows/executor/celery.py +++ b/aria/orchestrator/workflows/executor/celery.py @@ -44,11 +44,11 @@ class CeleryExecutor(BaseExecutor): def _execute(self, task): self._tasks[task.id] = task - inputs = dict(inp.unwrap() for inp in task.inputs.values()) - inputs['ctx'] = task.context + arguments = dict(arg.unwrap() for arg in task.arguments.values()) + arguments['ctx'] = task.context self._results[task.id] = self._app.send_task( task.operation_mapping, - kwargs=inputs, + kwargs=arguments, task_id=task.id, queue=self._get_queue(task)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index 63ec392..8848df8 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -33,7 +33,7 @@ class DryExecutor(BaseExecutor): task.status = task.STARTED dry_msg = ' {name} {task.interface_name}.{task.operation_name} {suffix}' - logger = task.context.logger.info if task.implementation else task.context.logger.debug + logger = task.context.logger.info if task.function else task.context.logger.debug if hasattr(task.actor, 'source_node'): name = '{source_node.name}->{target_node.name}'.format( @@ -41,11 +41,11 @@ class DryExecutor(BaseExecutor): else: name = task.actor.name - if task.implementation: + if task.function: logger(dry_msg.format(name=name, task=task, suffix='started...')) logger(dry_msg.format(name=name, task=task, suffix='successful')) else: - logger(dry_msg.format(name=name, task=task, suffix='has no implementation')) + logger(dry_msg.format(name=name, task=task, suffix='has no function')) # updating the task manually instead of calling self._task_succeeded(task), # to avoid any side effects raising that event might cause http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index f02e0a6..7472a2e 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -140,8 +140,8 @@ class ProcessExecutor(base.BaseExecutor): def _create_arguments_dict(self, task): return { 'task_id': task.id, - 'implementation': task.implementation, - 'operation_inputs': dict(inp.unwrap() for inp in task.inputs.values()), + 'function': task.function, + 'operation_arguments': dict(arg.unwrap() for arg in task.arguments.values()), 'port': self._server_port, 'context': task.context.serialization_dict, } @@ -290,8 +290,8 @@ def _main(): port = arguments['port'] messenger = _Messenger(task_id=task_id, port=port) - implementation = arguments['implementation'] - operation_inputs = arguments['operation_inputs'] + function = arguments['function'] + operation_arguments = arguments['operation_arguments'] context_dict = arguments['context'] try: @@ -302,11 +302,11 @@ def _main(): try: messenger.started() - task_func = imports.load_attribute(implementation) + task_func = imports.load_attribute(function) aria.install_aria_extensions() for decorate in process_executor.decorate(): task_func = decorate(task_func) - task_func(ctx=ctx, **operation_inputs) + task_func(ctx=ctx, **operation_arguments) ctx.close() messenger.succeeded() except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index f53362a..2c5ef16 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -60,9 +60,9 @@ class ThreadExecutor(BaseExecutor): task = self._queue.get(timeout=1) self._task_started(task) try: - task_func = imports.load_attribute(task.implementation) - inputs = dict(inp.unwrap() for inp in task.inputs.values()) - task_func(ctx=task.context, **inputs) + task_func = imports.load_attribute(task.function) + arguments = dict(arg.unwrap() for arg in task.arguments.values()) + task_func(ctx=task.context, **arguments) self._task_succeeded(task) except BaseException as e: self._task_failed(task, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/utils/formatting.py ---------------------------------------------------------------------- diff --git a/aria/utils/formatting.py b/aria/utils/formatting.py index f96a4ce..b8d24cd 100644 --- a/aria/utils/formatting.py +++ b/aria/utils/formatting.py @@ -124,7 +124,9 @@ def string_list_as_string(strings): Nice representation of a list of strings. """ - return ', '.join('"%s"' % safe_str(v) for v in strings) + if not strings: + return 'none' + return ', '.join('"{0}"'.format(safe_str(v)) for v in strings) def pluralize(noun): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/aria/utils/validation.py ---------------------------------------------------------------------- diff --git a/aria/utils/validation.py b/aria/utils/validation.py index a33f7a2..193cb33 100644 --- a/aria/utils/validation.py +++ b/aria/utils/validation.py @@ -17,6 +17,8 @@ Contains validation related utilities """ +from .formatting import string_list_as_string + class ValidatorMixin(object): """ @@ -82,8 +84,8 @@ def validate_function_arguments(func, func_kwargs): for arg in non_default_args: if arg not in func_kwargs: raise ValueError( - "The argument '{arg}' doest not have a default value, and it " - "isn't passed to {func.__name__}".format(arg=arg, func=func)) + 'The argument "{arg}" is not provided and does not have a default value for ' + 'function "{func.__name__}"'.format(arg=arg, func=func)) # check if there are any extra kwargs extra_kwargs = [arg for arg in func_kwargs.keys() if arg not in args] @@ -91,5 +93,5 @@ def validate_function_arguments(func, func_kwargs): # assert that the function has kwargs if extra_kwargs and not has_kwargs: raise ValueError("The following extra kwargs were supplied: {extra_kwargs}".format( - extra_kwargs=extra_kwargs + extra_kwargs=string_list_as_string(extra_kwargs) )) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/extensions/aria_extension_tosca/profiles/aria-1.0/aria-1.0.yaml ---------------------------------------------------------------------- diff --git a/extensions/aria_extension_tosca/profiles/aria-1.0/aria-1.0.yaml b/extensions/aria_extension_tosca/profiles/aria-1.0/aria-1.0.yaml index abac03b..c1dc11d 100644 --- a/extensions/aria_extension_tosca/profiles/aria-1.0/aria-1.0.yaml +++ b/extensions/aria_extension_tosca/profiles/aria-1.0/aria-1.0.yaml @@ -52,10 +52,10 @@ policy_types: should be inherited and extended with additional properties. derived_from: tosca.policies.Root properties: - function: + implementation: description: >- - The interpretation of the function string depends on the orchestrator. In ARIA it is the - full path to a Python @workflow function that generates a task graph based on the service - topology. + The interpretation of the implementation string depends on the orchestrator. In ARIA it is + the full path to a Python @workflow function that generates a task graph based on the + service topology. type: string required: true http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py ---------------------------------------------------------------------- diff --git a/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py b/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py index 0b04fdc..c88bf41 100644 --- a/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py +++ b/extensions/aria_extension_tosca/simple_v1_0/modeling/__init__.py @@ -378,7 +378,7 @@ def create_operation_template_model(context, service_template, operation): implementation = operation.implementation if implementation is not None: primary = implementation.primary - set_implementation(context, service_template, operation, model, primary) + extract_implementation_primary(context, service_template, operation, model, primary) relationship_edge = operation._get_extensions(context).get('relationship_edge') if relationship_edge is not None: if relationship_edge == 'source': @@ -392,6 +392,8 @@ def create_operation_template_model(context, service_template, operation): for dependency in dependencies: key, value = split_prefix(dependency) if key is not None: + # Special ARIA prefix: signifies configuration parameters + # Parse as YAML try: value = yaml.load(value) @@ -512,15 +514,13 @@ def create_workflow_operation_template_model(context, service_template, policy): properties = policy._get_property_values(context) for prop_name, prop in properties.iteritems(): - if prop_name == 'function': + if prop_name == 'implementation': model.function = prop.value - elif prop_name == 'dependencies': - model.dependencies = prop.value else: - model.configuration[prop_name] = Parameter(name=prop_name, # pylint: disable=unexpected-keyword-arg - type_name=prop.type, - value=prop.value, - description=prop.description) + model.inputs[prop_name] = Parameter(name=prop_name, # pylint: disable=unexpected-keyword-arg + type_name=prop.type, + value=prop.value, + description=prop.description) return model @@ -667,7 +667,7 @@ def split_prefix(string): split = IMPLEMENTATION_PREFIX_REGEX.split(string, 1) if len(split) < 2: return None, None - return split[0].strip(), split[1].lstrip() + return split[0].strip(), split[1].strip() def set_nested(the_dict, keys, value): @@ -693,7 +693,7 @@ def set_nested(the_dict, keys, value): set_nested(the_dict[key], keys, value) -def set_implementation(context, service_template, presentation, model, primary): +def extract_implementation_primary(context, service_template, presentation, model, primary): prefix, postfix = split_prefix(primary) if prefix: # Special ARIA prefix @@ -706,5 +706,5 @@ def set_implementation(context, service_template, presentation, model, primary): locator=presentation._get_child_locator('properties', 'implementation'), level=Issue.BETWEEN_TYPES) else: - # Standard TOSCA artifact + # Standard TOSCA artifact with default plugin model.implementation = primary http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/extensions/aria_extension_tosca/simple_v1_0/modeling/functions.py ---------------------------------------------------------------------- diff --git a/extensions/aria_extension_tosca/simple_v1_0/modeling/functions.py b/extensions/aria_extension_tosca/simple_v1_0/modeling/functions.py index 7089ed9..7be5bf6 100644 --- a/extensions/aria_extension_tosca/simple_v1_0/modeling/functions.py +++ b/extensions/aria_extension_tosca/simple_v1_0/modeling/functions.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from cStringIO import StringIO +from StringIO import StringIO # Note: cStringIO does not support Unicode import re from aria.utils.collections import FrozenList @@ -69,7 +69,9 @@ class Concat(Function): e, final = evaluate(e, final, container_holder) if e is not None: value.write(unicode(e)) - value = value.getvalue() + value = value.getvalue() or u'' + from aria.utils.console import puts + puts(safe_repr(value)) return Evaluation(value, final) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/modeling/test_models.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py index 57511dd..df3aebd 100644 --- a/tests/modeling/test_models.py +++ b/tests/modeling/test_models.py @@ -755,7 +755,7 @@ class TestTask(object): @pytest.mark.parametrize( 'is_valid, status, due_at, started_at, ended_at, max_attempts, attempts_count, ' - 'retry_interval, ignore_failure, name, operation_mapping, inputs, plugin_id', + 'retry_interval, ignore_failure, name, operation_mapping, arguments, plugin_id', [ (False, m_cls, now, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'), (False, Task.STARTED, m_cls, now, now, 1, 1, 1, True, 'name', 'map', {}, '1'), @@ -784,7 +784,7 @@ class TestTask(object): ) def test_task_model_creation(self, execution_storage, is_valid, status, due_at, started_at, ended_at, max_attempts, attempts_count, retry_interval, - ignore_failure, name, operation_mapping, inputs, plugin_id): + ignore_failure, name, operation_mapping, arguments, plugin_id): task = _test_model( is_valid=is_valid, storage=execution_storage, @@ -800,8 +800,8 @@ class TestTask(object): retry_interval=retry_interval, ignore_failure=ignore_failure, name=name, - implementation=operation_mapping, - inputs=inputs, + function=operation_mapping, + arguments=arguments, plugin_fk=plugin_id, )) if is_valid: @@ -813,8 +813,8 @@ class TestTask(object): def create_task(max_attempts): Task(execution_fk='eid', name='name', - implementation='', - inputs={}, + function='', + arguments={}, max_attempts=max_attempts) create_task(max_attempts=1) create_task(max_attempts=2) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 7dbdd04..eec75da 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -97,7 +97,7 @@ def test_node_operation_task_execution(ctx, thread_executor, dataholder): node, interface_name=interface_name, operation_name=operation_name, - inputs=arguments + arguments=arguments ) ) @@ -115,8 +115,8 @@ def test_node_operation_task_execution(ctx, thread_executor, dataholder): ) operations = interface.operations assert len(operations) == 1 - assert dataholder['implementation'] == operations.values()[0].function # pylint: disable=no-member - assert dataholder['inputs']['putput'] is True + assert dataholder['function'] == operations.values()[0].function # pylint: disable=no-member + assert dataholder['arguments']['putput'] is True # Context based attributes (sugaring) assert dataholder['template_name'] == node.node_template.name @@ -147,7 +147,7 @@ def test_relationship_operation_task_execution(ctx, thread_executor, dataholder) relationship, interface_name=interface_name, operation_name=operation_name, - inputs=arguments + arguments=arguments ) ) @@ -159,8 +159,8 @@ def test_relationship_operation_task_execution(ctx, thread_executor, dataholder) assert dataholder['actor_name'] == relationship.name assert interface_name in dataholder['task_name'] operations = interface.operations - assert dataholder['implementation'] == operations.values()[0].function # pylint: disable=no-member - assert dataholder['inputs']['putput'] is True + assert dataholder['function'] == operations.values()[0].function # pylint: disable=no-member + assert dataholder['arguments']['putput'] is True # Context based attributes (sugaring) dependency_node_template = ctx.model.node_template.get_by_name( @@ -252,7 +252,7 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir): node, interface_name=interface_name, operation_name=operation_name, - inputs=arguments)) + arguments=arguments)) execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service.id), @@ -301,7 +301,7 @@ def test_node_operation_logging(ctx, executor): node, interface_name=interface_name, operation_name=operation_name, - inputs=arguments + arguments=arguments ) ) @@ -334,7 +334,7 @@ def test_relationship_operation_logging(ctx, executor): relationship, interface_name=interface_name, operation_name=operation_name, - inputs=arguments + arguments=arguments ) ) @@ -348,15 +348,15 @@ def test_attribute_consumption(ctx, executor, dataholder): source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) - inputs = {'dict_': {'key': 'value'}, - 'set_test_dict': {'key2': 'value2'}} + arguments = {'dict_': {'key': 'value'}, + 'set_test_dict': {'key2': 'value2'}} interface = mock.models.create_interface( source_node.service, node_int_name, node_op_name, operation_kwargs=dict( - implementation=op_path(attribute_altering_operation, module_path=__name__), - inputs=inputs) + function=op_path(attribute_altering_operation, module_path=__name__), + arguments=arguments) ) source_node.interfaces[interface.name] = interface ctx.model.node.update(source_node) @@ -371,8 +371,8 @@ def test_attribute_consumption(ctx, executor, dataholder): rel_int_name, rel_op_name, operation_kwargs=dict( - implementation=op_path(attribute_consuming_operation, module_path=__name__), - inputs={'holder_path': dataholder.path} + function=op_path(attribute_consuming_operation, module_path=__name__), + arguments={'holder_path': dataholder.path} ) ) relationship.interfaces[interface.name] = interface @@ -386,7 +386,7 @@ def test_attribute_consumption(ctx, executor, dataholder): source_node, interface_name=node_int_name, operation_name=node_op_name, - inputs=inputs + arguments=arguments ), api.task.OperationTask( relationship, @@ -410,8 +410,7 @@ def test_attribute_consumption(ctx, executor, dataholder): dataholder['key2'] == 'value2' -def _assert_loggins(ctx, inputs): - +def _assert_loggins(ctx, arguments): # The logs should contain the following: Workflow Start, Operation Start, custom operation # log string (op_start), custom operation log string (op_end), Operation End, Workflow End. @@ -431,11 +430,11 @@ def _assert_loggins(ctx, inputs): assert all(l.execution == execution for l in logs) assert all(l in logs and l.task == task for l in task.logs) - op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info'] + op_start_log = [l for l in logs if arguments['op_start'] in l.msg and l.level.lower() == 'info'] assert len(op_start_log) == 1 op_start_log = op_start_log[0] - op_end_log = [l for l in logs if inputs['op_end'] in l.msg and l.level.lower() == 'debug'] + op_end_log = [l for l in logs if arguments['op_end'] in l.msg and l.level.lower() == 'debug'] assert len(op_end_log) == 1 op_end_log = op_end_log[0] @@ -444,10 +443,10 @@ def _assert_loggins(ctx, inputs): @operation def logged_operation(ctx, **_): - ctx.logger.info(ctx.task.inputs['op_start'].value) + ctx.logger.info(ctx.task.arguments['op_start'].value) # enables to check the relation between the created_at field properly time.sleep(1) - ctx.logger.debug(ctx.task.inputs['op_end'].value) + ctx.logger.debug(ctx.task.arguments['op_end'].value) @operation @@ -476,8 +475,8 @@ def operation_common(ctx, holder): holder['actor_name'] = ctx.task.actor.name holder['task_name'] = ctx.task.name - holder['implementation'] = ctx.task.implementation - holder['inputs'] = dict(i.unwrap() for i in ctx.task.inputs.values()) + holder['function'] = ctx.task.function + holder['arguments'] = dict(i.unwrap() for i in ctx.task.arguments.values()) @operation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 946b0bd..4db7bf4 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -60,8 +60,8 @@ def _mock_workflow(ctx, graph): def _mock_operation(ctx): # We test several things in this operation # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created - # a correct ctx.task.implementation tells us we kept the correct task_id - assert ctx.task.implementation == _operation_mapping() + # a correct ctx.task.function tells us we kept the correct task_id + assert ctx.task.function == _operation_mapping() # a correct ctx.node.name tells us we kept the correct actor_id assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME # a correct ctx.name tells us we kept the correct name http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py index 26a15e5..326ce83 100644 --- a/tests/orchestrator/context/test_toolbelt.py +++ b/tests/orchestrator/context/test_toolbelt.py @@ -105,7 +105,7 @@ def test_host_ip(workflow_context, executor, dataholder): dependency_node, interface_name=interface_name, operation_name=operation_name, - inputs=arguments + arguments=arguments ) ) @@ -136,7 +136,7 @@ def test_relationship_tool_belt(workflow_context, executor, dataholder): relationship, interface_name=interface_name, operation_name=operation_name, - inputs=arguments + arguments=arguments ) ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 0dfd512..d792a57 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -199,7 +199,7 @@ if __name__ == '__main__': props = self._run( executor, workflow_context, script_path=script_path, - inputs={'key': 'value'}) + arguments={'key': 'value'}) assert props['key'].value == 'value' @pytest.mark.parametrize( @@ -460,10 +460,10 @@ if __name__ == '__main__': script_path, process=None, env_var='value', - inputs=None): + arguments=None): local_script_path = script_path script_path = os.path.basename(local_script_path) if local_script_path else '' - arguments = inputs or {} + arguments = arguments or {} process = process or {} if script_path: workflow_context.resource.service.upload( @@ -495,7 +495,7 @@ if __name__ == '__main__': node, interface_name='test', operation_name='op', - inputs=arguments)) + arguments=arguments)) return graph tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter eng = engine.Engine( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index a369f8f..899a007 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -243,13 +243,13 @@ class TestWithActualSSHServer(object): ops = [] for test_operation in test_operations: - op_inputs = arguments.copy() - op_inputs['test_operation'] = test_operation + op_arguments = arguments.copy() + op_arguments['test_operation'] = test_operation ops.append(api.task.OperationTask( node, interface_name='test', operation_name='op', - inputs=op_inputs)) + arguments=op_arguments)) graph.sequence(*ops) return graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index 405cb80..3646339 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -48,8 +48,7 @@ def test_missing_workflow_implementation(service, request): workflow = models.Operation( name='test_workflow', service=service, - implementation='nonexistent.workflow.implementation', - inputs={}) + function='nonexistent.workflow.implementation') service.workflows['test_workflow'] = workflow with pytest.raises(exceptions.WorkflowImplementationNotFoundError): @@ -259,8 +258,9 @@ def _setup_mock_workflow_in_service(request, inputs=None): workflow = models.Operation( name=mock_workflow_name, service=service, - implementation='workflow.mock_workflow', - inputs=inputs or {}) + function='workflow.mock_workflow', + inputs=inputs or {}, + arguments=inputs or {}) service.workflows[mock_workflow_name] = workflow return mock_workflow_name http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/api/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py index d57e424..9d91b6b 100644 --- a/tests/orchestrator/workflows/api/test_task.py +++ b/tests/orchestrator/workflows/api/test_task.py @@ -66,7 +66,7 @@ class TestOperationTask(object): node, interface_name=interface_name, operation_name=operation_name, - inputs=arguments, + arguments=arguments, max_attempts=max_attempts, retry_interval=retry_interval, ignore_failure=ignore_failure) @@ -77,9 +77,9 @@ class TestOperationTask(object): interface=interface_name, operation=operation_name ) - assert api_task.implementation == 'op_path' + assert api_task.function == 'op_path' assert api_task.actor == node - assert api_task.inputs['test_input'].value is True + assert api_task.arguments['test_input'].value is True assert api_task.retry_interval == retry_interval assert api_task.max_attempts == max_attempts assert api_task.ignore_failure == ignore_failure @@ -113,7 +113,7 @@ class TestOperationTask(object): relationship, interface_name=interface_name, operation_name=operation_name, - inputs=arguments, + arguments=arguments, max_attempts=max_attempts, retry_interval=retry_interval) @@ -123,9 +123,9 @@ class TestOperationTask(object): interface=interface_name, operation=operation_name ) - assert api_task.implementation == 'op_path' + assert api_task.function == 'op_path' assert api_task.actor == relationship - assert api_task.inputs['test_input'].value is True + assert api_task.arguments['test_input'].value is True assert api_task.retry_interval == retry_interval assert api_task.max_attempts == max_attempts assert api_task.plugin.name == 'test_plugin' @@ -158,7 +158,7 @@ class TestOperationTask(object): relationship, interface_name=interface_name, operation_name=operation_name, - inputs=arguments, + arguments=arguments, max_attempts=max_attempts, retry_interval=retry_interval) @@ -168,9 +168,9 @@ class TestOperationTask(object): interface=interface_name, operation=operation_name ) - assert api_task.implementation == 'op_path' + assert api_task.function == 'op_path' assert api_task.actor == relationship - assert api_task.inputs['test_input'].value is True + assert api_task.arguments['test_input'].value is True assert api_task.retry_interval == retry_interval assert api_task.max_attempts == max_attempts assert api_task.plugin.name == 'test_plugin' @@ -198,7 +198,7 @@ class TestOperationTask(object): interface_name=interface_name, operation_name=operation_name) - assert task.inputs == {} + assert task.arguments == {} assert task.retry_interval == ctx._task_retry_interval assert task.max_attempts == ctx._task_max_attempts assert task.ignore_failure == ctx._task_ignore_failure http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 43ec9f1..6d2836c 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -57,7 +57,7 @@ class BaseTest(object): @staticmethod def _op(ctx, func, - inputs=None, + arguments=None, max_attempts=None, retry_interval=None, ignore_failure=None): @@ -65,9 +65,9 @@ class BaseTest(object): interface_name = 'aria.interfaces.lifecycle' operation_kwargs = dict(function='{name}.{func.__name__}'.format( name=__name__, func=func)) - if inputs: + if arguments: # the operation has to declare the arguments before those may be passed - operation_kwargs['arguments'] = inputs + operation_kwargs['arguments'] = arguments operation_name = 'create' interface = mock.models.create_interface(node.service, interface_name, operation_name, operation_kwargs=operation_kwargs) @@ -77,7 +77,7 @@ class BaseTest(object): node, interface_name='aria.interfaces.lifecycle', operation_name=operation_name, - inputs=inputs or {}, + arguments=arguments, max_attempts=max_attempts, retry_interval=retry_interval, ignore_failure=ignore_failure, @@ -189,8 +189,8 @@ class TestEngine(BaseTest): def test_two_tasks_execution_order(self, workflow_context, executor): @workflow def mock_workflow(ctx, graph): - op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1}) - op2 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2}) + op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) + op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) graph.sequence(op1, op2) self._execute( workflow_func=mock_workflow, @@ -204,9 +204,9 @@ class TestEngine(BaseTest): def test_stub_and_subworkflow_execution(self, workflow_context, executor): @workflow def sub_workflow(ctx, graph): - op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1}) + op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) op2 = api.task.StubTask() - op3 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2}) + op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) graph.sequence(op1, op2, op3) @workflow @@ -229,7 +229,7 @@ class TestCancel(BaseTest): @workflow def mock_workflow(ctx, graph): operations = ( - self._op(ctx, func=mock_sleep_task, inputs=dict(seconds=0.1)) + self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1)) for _ in range(number_of_tasks) ) return graph.sequence(*operations) @@ -270,7 +270,7 @@ class TestRetries(BaseTest): @workflow def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_conditional_failure_task, - inputs={'failure_count': 1}, + arguments={'failure_count': 1}, max_attempts=2) graph.add_tasks(op) self._execute( @@ -286,7 +286,7 @@ class TestRetries(BaseTest): @workflow def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_conditional_failure_task, - inputs={'failure_count': 2}, + arguments={'failure_count': 2}, max_attempts=2) graph.add_tasks(op) with pytest.raises(exceptions.ExecutorException): @@ -303,7 +303,7 @@ class TestRetries(BaseTest): @workflow def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_conditional_failure_task, - inputs={'failure_count': 1}, + arguments={'failure_count': 1}, max_attempts=3) graph.add_tasks(op) self._execute( @@ -319,7 +319,7 @@ class TestRetries(BaseTest): @workflow def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_conditional_failure_task, - inputs={'failure_count': 2}, + arguments={'failure_count': 2}, max_attempts=3) graph.add_tasks(op) self._execute( @@ -335,7 +335,7 @@ class TestRetries(BaseTest): @workflow def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_conditional_failure_task, - inputs={'failure_count': 1}, + arguments={'failure_count': 1}, max_attempts=-1) graph.add_tasks(op) self._execute( @@ -361,7 +361,7 @@ class TestRetries(BaseTest): @workflow def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_conditional_failure_task, - inputs={'failure_count': 1}, + arguments={'failure_count': 1}, max_attempts=2, retry_interval=retry_interval) graph.add_tasks(op) @@ -382,7 +382,7 @@ class TestRetries(BaseTest): def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_conditional_failure_task, ignore_failure=True, - inputs={'failure_count': 100}, + arguments={'failure_count': 100}, max_attempts=100) graph.add_tasks(op) self._execute( @@ -405,7 +405,7 @@ class TestTaskRetryAndAbort(BaseTest): @workflow def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_task_retry, - inputs={'message': self.message}, + arguments={'message': self.message}, retry_interval=default_retry_interval, max_attempts=2) graph.add_tasks(op) @@ -429,8 +429,8 @@ class TestTaskRetryAndAbort(BaseTest): @workflow def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_task_retry, - inputs={'message': self.message, - 'retry_interval': custom_retry_interval}, + arguments={'message': self.message, + 'retry_interval': custom_retry_interval}, retry_interval=default_retry_interval, max_attempts=2) graph.add_tasks(op) @@ -452,7 +452,7 @@ class TestTaskRetryAndAbort(BaseTest): @workflow def mock_workflow(ctx, graph): op = self._op(ctx, func=mock_task_abort, - inputs={'message': self.message}, + arguments={'message': self.message}, retry_interval=100, max_attempts=100) graph.add_tasks(op) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index 1ba6422..a717e19 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -103,9 +103,9 @@ class TestOperationTask(object): assert storage_task.actor == core_task.context.node._original_model assert core_task.model_task == storage_task assert core_task.name == api_task.name - assert core_task.implementation == api_task.implementation + assert core_task.function == api_task.function assert core_task.actor == api_task.actor == node - assert core_task.inputs == api_task.inputs == storage_task.inputs + assert core_task.arguments == api_task.arguments == storage_task.arguments assert core_task.plugin == storage_plugin def test_relationship_operation_task_creation(self, ctx): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index 1633d4a..5dd2855 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -108,9 +108,9 @@ def test_task_graph_into_execution_graph(tmpdir): def _assert_execution_is_api_task(execution_task, api_task): assert execution_task.id == api_task.id assert execution_task.name == api_task.name - assert execution_task.implementation == api_task.implementation + assert execution_task.function == api_task.function assert execution_task.actor == api_task.actor - assert execution_task.inputs == api_task.inputs + assert execution_task.arguments == api_task.arguments def _get_task_by_name(task_name, graph): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py index 41c4b2e..ac6d325 100644 --- a/tests/orchestrator/workflows/executor/__init__.py +++ b/tests/orchestrator/workflows/executor/__init__.py @@ -25,11 +25,11 @@ class MockTask(object): INFINITE_RETRIES = models.Task.INFINITE_RETRIES - def __init__(self, implementation, inputs=None, plugin=None, storage=None): - self.implementation = self.name = implementation + def __init__(self, function, arguments=None, plugin=None, storage=None): + self.function = self.name = function self.plugin_fk = plugin.id if plugin else None self.plugin = plugin or None - self.inputs = inputs or {} + self.arguments = arguments or {} self.states = [] self.exception = None self.id = str(uuid.uuid4()) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 29cb0e8..9ddaef4 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -38,16 +38,16 @@ import tests from . import MockTask -def _get_implementation(func): +def _get_function(func): return '{module}.{func.__name__}'.format(module=__name__, func=func) def execute_and_assert(executor, storage=None): expected_value = 'value' - successful_task = MockTask(_get_implementation(mock_successful_task), storage=storage) - failing_task = MockTask(_get_implementation(mock_failing_task), storage=storage) - task_with_inputs = MockTask(_get_implementation(mock_task_with_input), - inputs={'input': models.Parameter.wrap('input', 'value')}, + successful_task = MockTask(_get_function(mock_successful_task), storage=storage) + failing_task = MockTask(_get_function(mock_failing_task), storage=storage) + task_with_inputs = MockTask(_get_function(mock_task_with_input), + arguments={'input': models.Parameter.wrap('input', 'value')}, storage=storage) for task in [successful_task, failing_task, task_with_inputs]: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index e6333e8..058190e 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -66,7 +66,7 @@ class TestProcessExecutor(object): def test_closed(self, executor): executor.close() with pytest.raises(RuntimeError) as exc_info: - executor.execute(task=MockTask(implementation='some.implementation')) + executor.execute(task=MockTask(function='some.function')) assert 'closed' in exc_info.value.message http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py index 8c3f72a..6163c09 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py @@ -93,12 +93,12 @@ def _test(context, executor, lock_files, func, dataholder, expected_failure): node, interface_name=interface_name, operation_name=operation_name, - inputs=arguments), + arguments=arguments), api.task.OperationTask( node, interface_name=interface_name, operation_name=operation_name, - inputs=arguments) + arguments=arguments) ) signal = events.on_failure_task_signal http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index acca0bf..e4944df 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -27,7 +27,7 @@ from tests import storage def test_decorate_extension(context, executor): - arguments = {'input1': 1, 'input2': 2} + arguments = {'arg1': 1, 'arg2': 2} def get_node(ctx): return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) @@ -49,15 +49,15 @@ def test_decorate_extension(context, executor): node, interface_name=interface_name, operation_name=operation_name, - inputs=arguments) + arguments=arguments) graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) eng.execute() out = get_node(context).attributes.get('out').value - assert out['wrapper_inputs'] == arguments - assert out['function_inputs'] == arguments + assert out['wrapper_arguments'] == arguments + assert out['function_arguments'] == arguments @extension.process_executor @@ -65,16 +65,16 @@ class MockProcessExecutorExtension(object): def decorate(self): def decorator(function): - def wrapper(ctx, **operation_inputs): - ctx.node.attributes['out'] = {'wrapper_inputs': operation_inputs} - function(ctx=ctx, **operation_inputs) + def wrapper(ctx, **operation_arguments): + ctx.node.attributes['out'] = {'wrapper_arguments': operation_arguments} + function(ctx=ctx, **operation_arguments) return wrapper return decorator @operation -def _mock_operation(ctx, **operation_inputs): - ctx.node.attributes['out']['function_inputs'] = operation_inputs +def _mock_operation(ctx, **operation_arguments): + ctx.node.attributes['out']['function_arguments'] = operation_arguments @pytest.fixture http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdec01ab/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index c766fe4..2d80a3b 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -62,19 +62,19 @@ def test_refresh_state_of_tracked_attributes(context, executor): def test_apply_tracked_changes_during_an_operation(context, executor): - inputs = { + arguments = { 'committed': {'some': 'new', 'properties': 'right here'}, 'changed_but_refreshed': {'some': 'newer', 'properties': 'right there'} } expected_initial = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes out = _run_workflow( - context=context, executor=executor, op_func=_mock_updating_operation, inputs=inputs) + context=context, executor=executor, op_func=_mock_updating_operation, arguments=arguments) expected_after_update = expected_initial.copy() - expected_after_update.update(inputs['committed']) # pylint: disable=no-member + expected_after_update.update(arguments['committed']) # pylint: disable=no-member expected_after_change = expected_after_update.copy() - expected_after_change.update(inputs['changed_but_refreshed']) # pylint: disable=no-member + expected_after_change.update(arguments['changed_but_refreshed']) # pylint: disable=no-member assert out['initial'] == expected_initial assert out['after_update'] == expected_after_update @@ -82,13 +82,13 @@ def test_apply_tracked_changes_during_an_operation(context, executor): assert out['after_refresh'] == expected_after_change -def _run_workflow(context, executor, op_func, inputs=None): +def _run_workflow(context, executor, op_func, arguments=None): @workflow def mock_workflow(ctx, graph): node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface_name = 'test_interface' operation_name = 'operation' - wf_arguments = inputs or {} + wf_arguments = arguments or {} interface = mock.models.create_interface( ctx.service, interface_name, @@ -101,7 +101,7 @@ def _run_workflow(context, executor, op_func, inputs=None): node, interface_name=interface_name, operation_name=operation_name, - inputs=wf_arguments) + arguments=wf_arguments) graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter