Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id C0247200C52 for ; Mon, 10 Apr 2017 13:18:18 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id BEC4C160B99; Mon, 10 Apr 2017 11:18:18 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 91939160B85 for ; Mon, 10 Apr 2017 13:18:17 +0200 (CEST) Received: (qmail 11033 invoked by uid 500); 10 Apr 2017 11:18:16 -0000 Mailing-List: contact dev-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list dev@ariatosca.incubator.apache.org Received: (qmail 11022 invoked by uid 99); 10 Apr 2017 11:18:16 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Apr 2017 11:18:16 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 59636180684 for ; Mon, 10 Apr 2017 11:18:16 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id 0ofXB4WbvvYx for ; Mon, 10 Apr 2017 11:18:12 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id F34F05F1F4 for ; Mon, 10 Apr 2017 11:18:10 +0000 (UTC) Received: (qmail 11017 invoked by uid 99); 10 Apr 2017 11:18:10 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 10 Apr 2017 11:18:10 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 27DA3DFF36; Mon, 10 Apr 2017 11:18:10 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: avia@apache.org To: dev@ariatosca.incubator.apache.org Message-Id: <440cedd9ecd24212856ff3e67e1e3787@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ariatosca git commit: added tests for workflow runner Date: Mon, 10 Apr 2017 11:18:10 +0000 (UTC) archived-at: Mon, 10 Apr 2017 11:18:18 -0000 Repository: incubator-ariatosca Updated Branches: refs/heads/cli-tests e898e1064 -> 5b245b4a6 added tests for workflow runner Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5b245b4a Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5b245b4a Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5b245b4a Branch: refs/heads/cli-tests Commit: 5b245b4a6ff56f392601aa55ca6c4c5fb311bb38 Parents: e898e10 Author: Ran Ziv Authored: Sun Apr 9 17:30:20 2017 +0300 Committer: Ran Ziv Committed: Sun Apr 9 17:30:20 2017 +0300 ---------------------------------------------------------------------- aria/modeling/exceptions.py | 2 +- aria/modeling/utils.py | 2 +- aria/orchestrator/workflow_runner.py | 30 +-- aria/utils/type.py | 9 +- tests/mock/workflow.py | 26 +++ tests/orchestrator/test_workflow_runner.py | 293 ++++++++++++++++++++++++ 6 files changed, 341 insertions(+), 21 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/aria/modeling/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/modeling/exceptions.py b/aria/modeling/exceptions.py index f699560..8225f37 100644 --- a/aria/modeling/exceptions.py +++ b/aria/modeling/exceptions.py @@ -40,7 +40,7 @@ class MissingRequiredInputsException(ModelingException): """ -class InputOfWrongTypeException(ModelingException): +class InputsOfWrongTypeException(ModelingException): """ ARIA modeling exception: Inputs of the wrong types have been provided """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/aria/modeling/utils.py ---------------------------------------------------------------------- diff --git a/aria/modeling/utils.py b/aria/modeling/utils.py index 34c2ac7..acae065 100644 --- a/aria/modeling/utils.py +++ b/aria/modeling/utils.py @@ -96,7 +96,7 @@ def _merge_and_validate_inputs(inputs, template_inputs): for param_name, param_type in wrong_type_inputs.iteritems(): error_message.write('Input "{0}" must be of type {1}\n'. format(param_name, param_type)) - raise exceptions.InputOfWrongTypeException(error_message.getvalue()) + raise exceptions.InputsOfWrongTypeException(error_message.getvalue()) undeclared_inputs = [input_name for input_name in inputs.keys() if input_name not in template_inputs] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index e2ed3cf..1cdf1de 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -26,8 +26,8 @@ from .context.workflow import WorkflowContext from .workflows.builtin import BUILTIN_WORKFLOWS, BUILTIN_WORKFLOWS_PATH_PREFIX from .workflows.core.engine import Engine from .workflows.executor.process import ProcessExecutor -from ..modeling import utils as modeling_utils from ..modeling import models +from ..modeling import utils as modeling_utils from ..utils.imports import import_fullname @@ -100,8 +100,6 @@ class WorkflowRunner(object): return self._model_storage.service.get(self._service_id) def execute(self): - #TODO uncomment, commented for testing purposes - # self._validate_no_active_executions() self._engine.execute() def cancel(self): @@ -114,15 +112,16 @@ class WorkflowRunner(object): workflow_name=self._workflow_name, inputs={}) - # built-in workflows don't have any inputs, and are also - # not a part of the service's workflows field - if self._workflow_name not in BUILTIN_WORKFLOWS: - workflow_inputs = {k: v for k, v in - self.service.workflows[self._workflow_name].inputs - if k not in WORKFLOW_POLICY_INTERNAL_PROPERTIES} - - execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs) - + if self._workflow_name in BUILTIN_WORKFLOWS: + workflow_inputs = dict() # built-in workflows don't have any inputs + else: + workflow_inputs = dict((k, v) for k, v in + self.service.workflows[self._workflow_name].inputs.iteritems() + if k not in WORKFLOW_POLICY_INTERNAL_PROPERTIES) + + execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs) + # TODO: these two following calls should execute atomically + self._validate_no_active_executions(execution) self._model_storage.execution.put(execution) return execution @@ -133,11 +132,12 @@ class WorkflowRunner(object): 'No workflow policy {0} declared in service {1}' .format(self._workflow_name, self.service.name)) - def _validate_no_active_executions(self): - active_executions = [e for e in self.service.executions if e.is_active()] + def _validate_no_active_executions(self, execution): + active_executions = [e for e in self.service.executions + if e.id != execution.id and e.is_active()] if active_executions: raise exceptions.ActiveExecutionsError( - "Can't start execution; Service {0} has a running execution with id {1}" + "Can't start execution; Service {0} has an active execution with id {1}" .format(self.service.name, active_executions[0].id)) def _get_workflow_fn(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/aria/utils/type.py ---------------------------------------------------------------------- diff --git a/aria/utils/type.py b/aria/utils/type.py index abcf422..fff0f2a 100644 --- a/aria/utils/type.py +++ b/aria/utils/type.py @@ -36,11 +36,12 @@ def validate_value_type(value, type_name): 'float': float } - type = name_to_type.get(type_name.lower()) - if type is None: + type_ = name_to_type.get(type_name.lower()) + if type_ is None: raise RuntimeError('No supported type_name was provided') - # validating value type - ValueError will be raised on type mismatch - type(value) + + if type(value) != type_: + raise ValueError('Value {0} is not of type {1}'.format(value, type_name)) def convert_value_to_type(str_value, type_name): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/tests/mock/workflow.py ---------------------------------------------------------------------- diff --git a/tests/mock/workflow.py b/tests/mock/workflow.py new file mode 100644 index 0000000..b12b9fa --- /dev/null +++ b/tests/mock/workflow.py @@ -0,0 +1,26 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json + +from aria.orchestrator.decorators import workflow + + +@workflow +def mock_workflow(graph, ctx, output_path=None, **kwargs): # pylint: disable=unused-argument + if output_path: + # writes call arguments to the specified output file + with open(output_path, 'w') as f: + json.dump(kwargs, f) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5b245b4a/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py new file mode 100644 index 0000000..aa89ac5 --- /dev/null +++ b/tests/orchestrator/test_workflow_runner.py @@ -0,0 +1,293 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import json +from datetime import datetime + +import pytest +import mock + +from aria.modeling import exceptions as modeling_exceptions +from aria.modeling import models +from aria.orchestrator import exceptions +from aria.orchestrator.workflow_runner import WorkflowRunner +from aria.orchestrator.workflows.executor.process import ProcessExecutor + +from ..mock import ( + topology, + workflow as workflow_mocks +) +from ..fixtures import ( # pylint: disable=unused-import + plugins_dir, + plugin_manager, + fs_model as model, + resource_storage as resource +) + + +def test_undeclared_workflow(request): + # validating a proper error is raised when the workflow is not declared in the service + with pytest.raises(exceptions.UndeclaredWorkflowError): + _create_workflow_runner(request, 'undeclared_workflow') + + +def test_missing_workflow_implementation(service, request): + # validating a proper error is raised when the workflow code path does not exist + workflow = models.Operation( + name='test_workflow', + service=service, + implementation='nonexistent.workflow.implementation', + inputs={}) + service.workflows['test_workflow'] = workflow + + with pytest.raises(exceptions.WorkflowImplementationNotFoundError): + _create_workflow_runner(request, 'test_workflow') + + +def test_builtin_workflow_instantiation(request): + # validates the workflow runner instantiates properly when provided with a builtin workflow + # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) + workflow_runner = _create_workflow_runner(request, 'install') + tasks = list(workflow_runner._tasks_graph.tasks) + assert len(tasks) == 2 # expecting two WorkflowTasks + + +def test_custom_workflow_instantiation(request): + # validates the workflow runner instantiates properly when provided with a custom workflow + # (expecting no errors to be raised on undeclared workflow or missing workflow implementation) + mock_workflow = _setup_mock_workflow_in_service(request) + workflow_runner = _create_workflow_runner(request, mock_workflow) + tasks = list(workflow_runner._tasks_graph.tasks) + assert len(tasks) == 0 # mock workflow creates no tasks + + +def test_existing_active_executions(request, service, model): + existing_active_execution = models.Execution( + service=service, + status=models.Execution.STARTED, + workflow_name='uninstall') + model.execution.put(existing_active_execution) + with pytest.raises(exceptions.ActiveExecutionsError): + _create_workflow_runner(request, 'install') + + +def test_existing_executions_but_no_active_ones(request, service, model): + existing_terminated_execution = models.Execution( + service=service, + status=models.Execution.TERMINATED, + workflow_name='uninstall') + model.execution.put(existing_terminated_execution) + # no active executions exist, so no error should be raised + _create_workflow_runner(request, 'install') + + +def test_default_executor(request): + # validates the ProcessExecutor is used by the workflow runner by default + mock_workflow = _setup_mock_workflow_in_service(request) + + with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls: + _create_workflow_runner(request, mock_workflow) + _, engine_kwargs = mock_engine_cls.call_args + assert isinstance(engine_kwargs.get('executor'), ProcessExecutor) + + +def test_custom_executor(request): + mock_workflow = _setup_mock_workflow_in_service(request) + + custom_executor = mock.MagicMock() + with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls: + _create_workflow_runner(request, mock_workflow, executor=custom_executor) + _, engine_kwargs = mock_engine_cls.call_args + assert engine_kwargs.get('executor') == custom_executor + + +def test_task_configuration_parameters(request): + mock_workflow = _setup_mock_workflow_in_service(request) + + task_max_attempts = 5 + task_retry_interval = 7 + with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls: + _create_workflow_runner(request, mock_workflow, task_max_attempts=task_max_attempts, + task_retry_interval=task_retry_interval) + _, engine_kwargs = mock_engine_cls.call_args + assert engine_kwargs['workflow_context']._task_max_attempts == task_max_attempts + assert engine_kwargs['workflow_context']._task_retry_interval == task_retry_interval + + +def test_execute(request, service): + mock_workflow = _setup_mock_workflow_in_service(request) + + mock_engine = mock.MagicMock() + with mock.patch('aria.orchestrator.workflow_runner.Engine', return_value=mock_engine) \ + as mock_engine_cls: + workflow_runner = _create_workflow_runner(request, mock_workflow) + + _, engine_kwargs = mock_engine_cls.call_args + assert engine_kwargs['workflow_context'].service.id == service.id + assert engine_kwargs['workflow_context'].execution.workflow_name == 'test_workflow' + + workflow_runner.execute() + mock_engine.execute.assert_called_once_with() + + +def test_cancel_execution(request): + mock_workflow = _setup_mock_workflow_in_service(request) + + mock_engine = mock.MagicMock() + with mock.patch('aria.orchestrator.workflow_runner.Engine', return_value=mock_engine): + workflow_runner = _create_workflow_runner(request, mock_workflow) + workflow_runner.cancel() + mock_engine.cancel_execution.assert_called_once_with() + + +def test_execution_model_creation(request, service, model): + mock_workflow = _setup_mock_workflow_in_service(request) + + with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls: + workflow_runner = _create_workflow_runner(request, mock_workflow) + + _, engine_kwargs = mock_engine_cls.call_args + assert engine_kwargs['workflow_context'].execution == workflow_runner.execution + assert model.execution.get(workflow_runner.execution.id) == workflow_runner.execution + assert workflow_runner.execution.service.id == service.id + assert workflow_runner.execution.workflow_name == mock_workflow + assert workflow_runner.execution.created_at <= datetime.utcnow() + assert workflow_runner.execution.inputs == dict() + + +def test_execution_inputs_override_workflow_inputs(request): + wf_inputs = {'input1': 'value1', 'input2': 'value2', 'input3': 5} + mock_workflow = _setup_mock_workflow_in_service( + request, + inputs=dict((name, models.Parameter.wrap(name, val)) for name, val + in wf_inputs.iteritems())) + + with mock.patch('aria.orchestrator.workflow_runner.Engine') as mock_engine_cls: + workflow_runner = _create_workflow_runner( + request, mock_workflow, inputs={'input2': 'overriding-value2', 'input3': 7}) + + _, engine_kwargs = mock_engine_cls.call_args + assert len(workflow_runner.execution.inputs) == 3 + # did not override input1 - expecting the default value from the workflow inputs + assert workflow_runner.execution.inputs['input1'].value == 'value1' + # overrode input2 + assert workflow_runner.execution.inputs['input2'].value == 'overriding-value2' + # overrode input of integer type + assert workflow_runner.execution.inputs['input3'].value == 7 + + +def test_execution_inputs_undeclared_inputs(request): + mock_workflow = _setup_mock_workflow_in_service(request) + + with pytest.raises(modeling_exceptions.UndeclaredInputsException): + _create_workflow_runner(request, mock_workflow, inputs={'undeclared_input': 'value'}) + + +def test_execution_inputs_missing_required_inputs(request): + mock_workflow = _setup_mock_workflow_in_service( + request, inputs={'required_input': models.Parameter.wrap('required_input', value=None)}) + + with pytest.raises(modeling_exceptions.MissingRequiredInputsException): + _create_workflow_runner(request, mock_workflow, inputs={}) + + +def test_execution_inputs_wrong_type_inputs(request): + mock_workflow = _setup_mock_workflow_in_service( + request, inputs={'input': models.Parameter.wrap('input', 'value')}) + + with pytest.raises(modeling_exceptions.InputsOfWrongTypeException): + _create_workflow_runner(request, mock_workflow, inputs={'input': 5}) + + +def test_execution_inputs_builtin_workflow_with_inputs(request): + # built-in workflows don't have inputs + with pytest.raises(modeling_exceptions.UndeclaredInputsException): + _create_workflow_runner(request, 'install', inputs={'undeclared_input': 'value'}) + + +def test_workflow_function_parameters(request, tmpdir): + # validating the workflow function is passed with the + # merged execution inputs, in dict form + + # the workflow function parameters will be written to this file + output_path = str(tmpdir.join('output')) + wf_inputs = {'output_path': output_path, 'input1': 'value1', 'input2': 'value2', 'input3': 5} + + mock_workflow = _setup_mock_workflow_in_service( + request, inputs=dict((name, models.Parameter.wrap(name, val)) for name, val + in wf_inputs.iteritems())) + + _create_workflow_runner(request, mock_workflow, + inputs={'input2': 'overriding-value2', 'input3': 7}) + + with open(output_path) as f: + wf_call_kwargs = json.load(f) + assert len(wf_call_kwargs) == 3 + assert wf_call_kwargs.get('input1') == 'value1' + assert wf_call_kwargs.get('input2') == 'overriding-value2' + assert wf_call_kwargs.get('input3') == 7 + + +@pytest.fixture +def service(model): + # sets up a service in the storage + service_id = topology.create_simple_topology_two_nodes(model) + service = model.service.get(service_id) + return service + + +def _setup_mock_workflow_in_service(request, inputs=None): + # sets up a mock workflow as part of the service, including uploading + # the workflow code to the service's dir on the resource storage + service = request.getfuncargvalue('service') + resource = request.getfuncargvalue('resource') + + source = workflow_mocks.__file__ + resource.service_template.upload(str(service.service_template.id), source) + mock_workflow_name = 'test_workflow' + workflow = models.Operation( + name=mock_workflow_name, + service=service, + implementation='workflow.mock_workflow', + inputs=inputs or {}) + service.workflows[mock_workflow_name] = workflow + return mock_workflow_name + + +def _create_workflow_runner(request, workflow_name, inputs=None, executor=None, + task_max_attempts=None, task_retry_interval=None): + # helper method for instantiating a workflow runner + service_id = request.getfuncargvalue('service').id + model = request.getfuncargvalue('model') + resource = request.getfuncargvalue('resource') + plugin_manager = request.getfuncargvalue('plugin_manager') + + # task configuration parameters can't be set to None, therefore only + # passing those if they've been set by the test + task_configuration_kwargs = dict() + if task_max_attempts is not None: + task_configuration_kwargs['task_max_attempts'] = task_max_attempts + if task_retry_interval is not None: + task_configuration_kwargs['task_retry_interval'] = task_retry_interval + + return WorkflowRunner( + workflow_name=workflow_name, + service_id=service_id, + inputs=inputs or {}, + executor=executor, + model_storage=model, + resource_storage=resource, + plugin_manager=plugin_manager, + **task_configuration_kwargs)