Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E59AC200CA7 for ; Wed, 14 Jun 2017 16:33:44 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E41B7160BE8; Wed, 14 Jun 2017 14:33:44 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id B7A6A160BDB for ; Wed, 14 Jun 2017 16:33:42 +0200 (CEST) Received: (qmail 55476 invoked by uid 500); 14 Jun 2017 14:33:42 -0000 Mailing-List: contact dev-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list dev@ariatosca.incubator.apache.org Received: (qmail 55464 invoked by uid 99); 14 Jun 2017 14:33:41 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Jun 2017 14:33:41 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 6A8F41AFD12 for ; Wed, 14 Jun 2017 14:33:41 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.221 X-Spam-Level: X-Spam-Status: No, score=-4.221 tagged_above=-999 required=6.31 tests=[HK_RANDOM_FROM=0.001, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id zQfIxOn-vCKq for ; Wed, 14 Jun 2017 14:33:31 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 78E945F659 for ; Wed, 14 Jun 2017 14:33:29 +0000 (UTC) Received: (qmail 54883 invoked by uid 99); 14 Jun 2017 14:33:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 14 Jun 2017 14:33:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id B087FE01C3; Wed, 14 Jun 2017 14:33:27 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxmrlv@apache.org To: dev@ariatosca.incubator.apache.org Date: Wed, 14 Jun 2017 14:33:29 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [3/5] incubator-ariatosca git commit: ARIA-276 Support model instrumentation for workflows archived-at: Wed, 14 Jun 2017 14:33:45 -0000 ARIA-276 Support model instrumentation for workflows Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/2149a5ee Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/2149a5ee Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/2149a5ee Branch: refs/heads/ARIA-278-Remove-core-tasks Commit: 2149a5ee0c4656a253f54db20f279197961588c1 Parents: 1e883c5 Author: max-orlov Authored: Thu Jun 8 09:52:31 2017 +0300 Committer: max-orlov Committed: Tue Jun 13 14:34:41 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/context/common.py | 7 + aria/orchestrator/context/operation.py | 7 - aria/orchestrator/decorators.py | 5 +- aria/orchestrator/workflows/api/task.py | 2 - aria/orchestrator/workflows/core/task.py | 12 +- aria/storage/collection_instrumentation.py | 46 +-- .../context/test_collection_instrumentation.py | 325 ------------------- .../context/test_context_instrumentation.py | 108 ++++++ tests/orchestrator/context/test_serialize.py | 20 +- tests/orchestrator/context/test_workflow.py | 93 ++++-- .../orchestrator/execution_plugin/test_local.py | 26 +- tests/orchestrator/execution_plugin/test_ssh.py | 50 +-- .../workflows/builtin/test_execute_operation.py | 9 +- .../orchestrator/workflows/core/test_engine.py | 88 +++-- .../executor/test_process_executor_extension.py | 24 +- .../test_process_executor_tracked_changes.py | 26 +- .../storage/test_collection_instrumentation.py | 257 +++++++++++++++ 17 files changed, 627 insertions(+), 478 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index c98e026..f4df317 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -36,6 +36,13 @@ class BaseContext(object): Base context object for workflow and operation """ + INSTRUMENTATION_FIELDS = ( + modeling.models.Node.attributes, + modeling.models.Node.properties, + modeling.models.NodeTemplate.attributes, + modeling.models.NodeTemplate.properties + ) + class PrefixedLogger(object): def __init__(self, base_logger, task_id=None): self._logger = base_logger http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index af7220d..efdc04d 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -29,13 +29,6 @@ class BaseOperationContext(common.BaseContext): Context object used during operation creation and execution """ - INSTRUMENTATION_FIELDS = ( - aria.modeling.models.Node.attributes, - aria.modeling.models.Node.properties, - aria.modeling.models.NodeTemplate.attributes, - aria.modeling.models.NodeTemplate.properties - ) - def __init__(self, task_id, actor_id, **kwargs): self._task_id = task_id self._actor_id = actor_id http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/decorators.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py index 80f6962..389bfb8 100644 --- a/aria/orchestrator/decorators.py +++ b/aria/orchestrator/decorators.py @@ -49,8 +49,9 @@ def workflow(func=None, suffix_template=''): workflow_parameters.setdefault('ctx', ctx) workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name)) validate_function_arguments(func, workflow_parameters) - with context.workflow.current.push(ctx): - func(**workflow_parameters) + with ctx.model.instrument(*ctx.INSTRUMENTATION_FIELDS): + with context.workflow.current.push(ctx): + func(**workflow_parameters) return workflow_parameters['graph'] return _wrapper http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index bcba56e..ca125a8 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -108,8 +108,6 @@ class OperationTask(BaseTask): ``interface_name`` and ``operation_name`` to not refer to an operation on the actor """ - assert isinstance(actor, (models.Node, models.Relationship)) - # Creating OperationTask directly should raise an error when there is no # interface/operation. if not has_operation(actor, interface_name, operation_name): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 72d83ea..d732f09 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -124,20 +124,22 @@ class OperationTask(BaseTask): self.operation_name = api_task.operation_name model_storage = api_task._workflow_context.model + actor = getattr(api_task.actor, '_wrapped', api_task.actor) + base_task_model = model_storage.task.model_cls - if isinstance(api_task.actor, models.Node): + if isinstance(actor, models.Node): context_cls = operation_context.NodeOperationContext create_task_model = base_task_model.for_node - elif isinstance(api_task.actor, models.Relationship): + elif isinstance(actor, models.Relationship): context_cls = operation_context.RelationshipOperationContext create_task_model = base_task_model.for_relationship else: raise RuntimeError('No operation context could be created for {actor.model_cls}' - .format(actor=api_task.actor)) + .format(actor=actor)) task_model = create_task_model( name=api_task.name, - actor=api_task.actor, + actor=actor, status=base_task_model.PENDING, max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, @@ -156,7 +158,7 @@ class OperationTask(BaseTask): resource_storage=self._workflow_context.resource, service_id=self._workflow_context._service_id, task_id=task_model.id, - actor_id=api_task.actor.id, + actor_id=actor.id, execution_id=self._workflow_context._execution_id, workdir=self._workflow_context._workdir) self._task_id = task_model.id http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/aria/storage/collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/collection_instrumentation.py b/aria/storage/collection_instrumentation.py index 27d8322..454f97a 100644 --- a/aria/storage/collection_instrumentation.py +++ b/aria/storage/collection_instrumentation.py @@ -198,23 +198,28 @@ class _InstrumentedList(_InstrumentedCollection, list): return list(self) -class _InstrumentedModel(object): +class _WrappedBase(object): - def __init__(self, original_model, mapi, instrumentation): + def __init__(self, wrapped, instrumentation): + self._wrapped = wrapped + self._instrumentation = instrumentation + + +class _InstrumentedModel(_WrappedBase): + + def __init__(self, mapi, *args, **kwargs): """ The original model - :param original_model: the model to be instrumented + :param wrapped: the model to be instrumented :param mapi: the mapi for that model """ - super(_InstrumentedModel, self).__init__() - self._original_model = original_model + super(_InstrumentedModel, self).__init__(*args, **kwargs) self._mapi = mapi - self._instrumentation = instrumentation self._apply_instrumentation() def __getattr__(self, item): - return_value = getattr(self._original_model, item) - if isinstance(return_value, self._original_model.__class__): + return_value = getattr(self._wrapped, item) + if isinstance(return_value, self._wrapped.__class__): return _create_instrumented_model(return_value, self._mapi, self._instrumentation) if isinstance(return_value, (list, dict)): return _create_wrapped_model(return_value, self._mapi, self._instrumentation) @@ -224,7 +229,7 @@ class _InstrumentedModel(object): for field in self._instrumentation: field_name = field.key field_cls = field.mapper.class_ - field = getattr(self._original_model, field_name) + field = getattr(self._wrapped, field_name) # Preserve the original value. e.g. original attributes would be located under # _attributes @@ -241,20 +246,20 @@ class _InstrumentedModel(object): "ARIA supports instrumentation for dict and list. Field {field} of the " "class {model} is of {type} type.".format( field=field, - model=self._original_model, + model=self._wrapped, type=type(field))) instrumented_class = instrumentation_cls(seq=field, - parent=self._original_model, + parent=self._wrapped, mapi=self._mapi, field_name=field_name, field_cls=field_cls) setattr(self, field_name, instrumented_class) -class _WrappedModel(object): +class _WrappedModel(_WrappedBase): - def __init__(self, wrapped, instrumentation, **kwargs): + def __init__(self, instrumentation_kwargs, *args, **kwargs): """ :param instrumented_cls: The class to be instrumented @@ -262,9 +267,8 @@ class _WrappedModel(object): :param wrapped: the currently wrapped instance :param kwargs: and kwargs to the passed to the instrumented class. """ - self._kwargs = kwargs - self._instrumentation = instrumentation - self._wrapped = wrapped + super(_WrappedModel, self).__init__(*args, **kwargs) + self._kwargs = instrumentation_kwargs def _wrap(self, value): if value.__class__ in (class_.class_ for class_ in self._instrumentation): @@ -286,16 +290,18 @@ class _WrappedModel(object): return self._wrap(self._wrapped[item]) -def _create_instrumented_model(original_model, mapi, instrumentation, **kwargs): +def _create_instrumented_model(original_model, mapi, instrumentation): return type('Instrumented{0}'.format(original_model.__class__.__name__), (_InstrumentedModel,), - {})(original_model, mapi, instrumentation, **kwargs) + {})(wrapped=original_model, instrumentation=instrumentation, mapi=mapi) -def _create_wrapped_model(original_model, mapi, instrumentation, **kwargs): +def _create_wrapped_model(original_model, mapi, instrumentation): return type('Wrapped{0}'.format(original_model.__class__.__name__), (_WrappedModel, ), - {})(original_model, instrumentation, mapi=mapi, **kwargs) + {})(wrapped=original_model, + instrumentation=instrumentation, + instrumentation_kwargs=dict(mapi=mapi)) def instrument(instrumentation, original_model, mapi): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/context/test_collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_collection_instrumentation.py b/tests/orchestrator/context/test_collection_instrumentation.py deleted file mode 100644 index ae3e8ac..0000000 --- a/tests/orchestrator/context/test_collection_instrumentation.py +++ /dev/null @@ -1,325 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import pytest - -from aria.modeling import models -from aria.storage import collection_instrumentation -from aria.orchestrator.context import operation - -from tests import ( - mock, - storage -) - - -class MockActor(object): - def __init__(self): - self.dict_ = {} - self.list_ = [] - - -class MockMAPI(object): - - def __init__(self): - pass - - def put(self, *args, **kwargs): - pass - - def update(self, *args, **kwargs): - pass - - -class CollectionInstrumentation(object): - - @pytest.fixture - def actor(self): - return MockActor() - - @pytest.fixture - def model(self): - return MockMAPI() - - @pytest.fixture - def dict_(self, actor, model): - return collection_instrumentation._InstrumentedDict(model, actor, 'dict_', models.Attribute) - - @pytest.fixture - def list_(self, actor, model): - return collection_instrumentation._InstrumentedList(model, actor, 'list_', models.Attribute) - - -class TestDict(CollectionInstrumentation): - - def test_keys(self, actor, dict_): - dict_.update( - { - 'key1': models.Attribute.wrap('key1', 'value1'), - 'key2': models.Attribute.wrap('key2', 'value2') - } - ) - assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) - - def test_values(self, actor, dict_): - dict_.update({ - 'key1': models.Attribute.wrap('key1', 'value1'), - 'key2': models.Attribute.wrap('key1', 'value2') - }) - assert (sorted(dict_.values()) == - sorted(['value1', 'value2']) == - sorted(v.value for v in actor.dict_.values())) - - def test_items(self, dict_): - dict_.update({ - 'key1': models.Attribute.wrap('key1', 'value1'), - 'key2': models.Attribute.wrap('key1', 'value2') - }) - assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')]) - - def test_iter(self, actor, dict_): - dict_.update({ - 'key1': models.Attribute.wrap('key1', 'value1'), - 'key2': models.Attribute.wrap('key1', 'value2') - }) - assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) - - def test_bool(self, dict_): - assert not dict_ - dict_.update({ - 'key1': models.Attribute.wrap('key1', 'value1'), - 'key2': models.Attribute.wrap('key1', 'value2') - }) - assert dict_ - - def test_set_item(self, actor, dict_): - dict_['key1'] = models.Attribute.wrap('key1', 'value1') - assert dict_['key1'] == 'value1' == actor.dict_['key1'].value - assert isinstance(actor.dict_['key1'], models.Attribute) - - def test_nested(self, actor, dict_): - dict_['key'] = {} - assert isinstance(actor.dict_['key'], models.Attribute) - assert dict_['key'] == actor.dict_['key'].value == {} - - dict_['key']['inner_key'] = 'value' - - assert len(dict_) == 1 - assert 'inner_key' in dict_['key'] - assert dict_['key']['inner_key'] == 'value' - assert dict_['key'].keys() == ['inner_key'] - assert dict_['key'].values() == ['value'] - assert dict_['key'].items() == [('inner_key', 'value')] - assert isinstance(actor.dict_['key'], models.Attribute) - assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) - - dict_['key'].update({'updated_key': 'updated_value'}) - assert len(dict_) == 1 - assert 'updated_key' in dict_['key'] - assert dict_['key']['updated_key'] == 'updated_value' - assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key']) - assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value']) - assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'), - ('updated_key', 'updated_value')]) - assert isinstance(actor.dict_['key'], models.Attribute) - assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) - - dict_.update({'key': 'override_value'}) - assert len(dict_) == 1 - assert 'key' in dict_ - assert dict_['key'] == 'override_value' - assert len(actor.dict_) == 1 - assert isinstance(actor.dict_['key'], models.Attribute) - assert actor.dict_['key'].value == 'override_value' - - def test_get_item(self, actor, dict_): - dict_['key1'] = models.Attribute.wrap('key1', 'value1') - assert isinstance(actor.dict_['key1'], models.Attribute) - - def test_update(self, actor, dict_): - dict_['key1'] = 'value1' - - new_dict = {'key2': 'value2'} - dict_.update(new_dict) - assert len(dict_) == 2 - assert dict_['key2'] == 'value2' - assert isinstance(actor.dict_['key2'], models.Attribute) - - new_dict = {} - new_dict.update(dict_) - assert new_dict['key1'] == dict_['key1'] - - def test_copy(self, dict_): - dict_['key1'] = 'value1' - - new_dict = dict_.copy() - assert new_dict is not dict_ - assert new_dict == dict_ - - dict_['key1'] = 'value2' - assert new_dict['key1'] == 'value1' - assert dict_['key1'] == 'value2' - - def test_clear(self, dict_): - dict_['key1'] = 'value1' - dict_.clear() - - assert len(dict_) == 0 - - -class TestList(CollectionInstrumentation): - - def test_append(self, actor, list_): - list_.append(models.Attribute.wrap('name', 'value1')) - list_.append('value2') - assert len(actor.list_) == 2 - assert len(list_) == 2 - assert isinstance(actor.list_[0], models.Attribute) - assert list_[0] == 'value1' - - assert isinstance(actor.list_[1], models.Attribute) - assert list_[1] == 'value2' - - list_[0] = 'new_value1' - list_[1] = 'new_value2' - assert isinstance(actor.list_[1], models.Attribute) - assert isinstance(actor.list_[1], models.Attribute) - assert list_[0] == 'new_value1' - assert list_[1] == 'new_value2' - - def test_iter(self, list_): - list_.append('value1') - list_.append('value2') - assert sorted(list_) == sorted(['value1', 'value2']) - - def test_insert(self, actor, list_): - list_.append('value1') - list_.insert(0, 'value2') - list_.insert(2, 'value3') - list_.insert(10, 'value4') - assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4']) - assert len(actor.list_) == 4 - - def test_set(self, list_): - list_.append('value1') - list_.append('value2') - - list_[1] = 'value3' - assert len(list_) == 2 - assert sorted(list_) == sorted(['value1', 'value3']) - - def test_insert_into_nested(self, actor, list_): - list_.append([]) - - list_[0].append('inner_item') - assert isinstance(actor.list_[0], models.Attribute) - assert len(list_) == 1 - assert list_[0][0] == 'inner_item' - - list_[0].append('new_item') - assert isinstance(actor.list_[0], models.Attribute) - assert len(list_) == 1 - assert list_[0][1] == 'new_item' - - assert list_[0] == ['inner_item', 'new_item'] - assert ['inner_item', 'new_item'] == list_[0] - - -class TestDictList(CollectionInstrumentation): - def test_dict_in_list(self, actor, list_): - list_.append({}) - assert len(list_) == 1 - assert isinstance(actor.list_[0], models.Attribute) - assert actor.list_[0].value == {} - - list_[0]['key'] = 'value' - assert list_[0]['key'] == 'value' - assert len(actor.list_) == 1 - assert isinstance(actor.list_[0], models.Attribute) - assert actor.list_[0].value['key'] == 'value' - - def test_list_in_dict(self, actor, dict_): - dict_['key'] = [] - assert len(dict_) == 1 - assert isinstance(actor.dict_['key'], models.Attribute) - assert actor.dict_['key'].value == [] - - dict_['key'].append('value') - assert dict_['key'][0] == 'value' - assert len(actor.dict_) == 1 - assert isinstance(actor.dict_['key'], models.Attribute) - assert actor.dict_['key'].value[0] == 'value' - - -class TestModelInstrumentation(object): - - @pytest.fixture - def workflow_ctx(self, tmpdir): - context = mock.context.simple(str(tmpdir), inmemory=True) - yield context - storage.release_sqlite_storage(context.model) - - def test_attributes_access(self, workflow_ctx): - node = workflow_ctx.model.node.list()[0] - task = models.Task(node=node) - workflow_ctx.model.task.put(task) - - ctx = operation.NodeOperationContext( - task.id, node.id, name='', service_id=workflow_ctx.model.service.list()[0].id, - model_storage=workflow_ctx.model, resource_storage=workflow_ctx.resource, - execution_id=1) - - def _run_assertions(is_under_ctx): - def ctx_assert(expr): - if is_under_ctx: - assert expr - else: - assert not expr - - ctx_assert(isinstance(ctx.node.attributes, - collection_instrumentation._InstrumentedDict)) - assert not isinstance(ctx.node.properties, - collection_instrumentation._InstrumentedCollection) - - for rel in ctx.node.inbound_relationships: - ctx_assert(isinstance(rel, collection_instrumentation._WrappedModel)) - ctx_assert(isinstance(rel.source_node.attributes, - collection_instrumentation._InstrumentedDict)) - ctx_assert(isinstance(rel.target_node.attributes, - collection_instrumentation._InstrumentedDict)) - - for node in ctx.model.node: - ctx_assert(isinstance(node.attributes, - collection_instrumentation._InstrumentedDict)) - assert not isinstance(node.properties, - collection_instrumentation._InstrumentedCollection) - - for rel in ctx.model.relationship: - ctx_assert(isinstance(rel, collection_instrumentation._WrappedModel)) - - ctx_assert(isinstance(rel.source_node.attributes, - collection_instrumentation._InstrumentedDict)) - ctx_assert(isinstance(rel.target_node.attributes, - collection_instrumentation._InstrumentedDict)) - - assert not isinstance(rel.source_node.properties, - collection_instrumentation._InstrumentedCollection) - assert not isinstance(rel.target_node.properties, - collection_instrumentation._InstrumentedCollection) - - with ctx.model.instrument(models.Node.attributes): - _run_assertions(True) - - _run_assertions(False) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/context/test_context_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_context_instrumentation.py b/tests/orchestrator/context/test_context_instrumentation.py new file mode 100644 index 0000000..6cc8096 --- /dev/null +++ b/tests/orchestrator/context/test_context_instrumentation.py @@ -0,0 +1,108 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.modeling import models +from aria.storage import collection_instrumentation +from aria.orchestrator.context import operation + +from tests import ( + mock, + storage +) + + +class TestContextInstrumentation(object): + + @pytest.fixture + def workflow_ctx(self, tmpdir): + context = mock.context.simple(str(tmpdir), inmemory=True) + yield context + storage.release_sqlite_storage(context.model) + + def test_workflow_context_instrumentation(self, workflow_ctx): + with workflow_ctx.model.instrument(models.Node.attributes): + self._run_common_assertions(workflow_ctx, True) + self._run_common_assertions(workflow_ctx, False) + + def test_operation_context_instrumentation(self, workflow_ctx): + node = workflow_ctx.model.node.list()[0] + task = models.Task(node=node) + workflow_ctx.model.task.put(task) + + ctx = operation.NodeOperationContext( + task.id, node.id, name='', service_id=workflow_ctx.model.service.list()[0].id, + model_storage=workflow_ctx.model, resource_storage=workflow_ctx.resource, + execution_id=1) + + with ctx.model.instrument(models.Node.attributes): + self._run_op_assertions(ctx, True) + self._run_common_assertions(ctx, True) + + self._run_op_assertions(ctx, False) + self._run_common_assertions(ctx, False) + + @staticmethod + def ctx_assert(expr, is_under_ctx): + if is_under_ctx: + assert expr + else: + assert not expr + + def _run_op_assertions(self, ctx, is_under_ctx): + self.ctx_assert(isinstance(ctx.node.attributes, + collection_instrumentation._InstrumentedDict), is_under_ctx) + assert not isinstance(ctx.node.properties, + collection_instrumentation._InstrumentedCollection) + + for rel in ctx.node.inbound_relationships: + self.ctx_assert( + isinstance(rel, collection_instrumentation._WrappedModel), is_under_ctx) + self.ctx_assert( + isinstance(rel.source_node.attributes, + collection_instrumentation._InstrumentedDict), + is_under_ctx) + self.ctx_assert( + isinstance(rel.target_node.attributes, + collection_instrumentation._InstrumentedDict), + is_under_ctx) + + def _run_common_assertions(self, ctx, is_under_ctx): + + for node in ctx.model.node: + self.ctx_assert( + isinstance(node.attributes, collection_instrumentation._InstrumentedDict), + is_under_ctx) + assert not isinstance(node.properties, + collection_instrumentation._InstrumentedCollection) + + for rel in ctx.model.relationship: + self.ctx_assert( + isinstance(rel, collection_instrumentation._WrappedModel), is_under_ctx) + + self.ctx_assert( + isinstance(rel.source_node.attributes, + collection_instrumentation._InstrumentedDict), + is_under_ctx) + self.ctx_assert( + isinstance(rel.target_node.attributes, + collection_instrumentation._InstrumentedDict), + is_under_ctx) + + assert not isinstance(rel.source_node.properties, + collection_instrumentation._InstrumentedCollection) + assert not isinstance(rel.target_node.properties, + collection_instrumentation._InstrumentedCollection) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 4db7bf4..0919e81 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -33,16 +33,10 @@ def test_serialize_operation_context(context, executor, tmpdir): test_file.write(TEST_FILE_CONTENT) resource = context.resource resource.service_template.upload(TEST_FILE_ENTRY_ID, str(test_file)) - graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) - eng.execute() - -@workflow -def _mock_workflow(ctx, graph): - node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) plugin = mock.models.create_plugin() - ctx.model.plugin.put(plugin) + context.model.plugin.put(plugin) interface = mock.models.create_interface( node.service, 'test', @@ -51,6 +45,16 @@ def _mock_workflow(ctx, graph): plugin=plugin) ) node.interfaces[interface.name] = interface + context.model.node.update(node) + + graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter + eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph) + eng.execute() + + +@workflow +def _mock_workflow(ctx, graph): + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) task = api.task.OperationTask(node, interface_name='test', operation_name='op') graph.add_tasks(task) return graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/context/test_workflow.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_workflow.py b/tests/orchestrator/context/test_workflow.py index 3c35435..6d53c2a 100644 --- a/tests/orchestrator/context/test_workflow.py +++ b/tests/orchestrator/context/test_workflow.py @@ -17,11 +17,14 @@ from datetime import datetime import pytest -from aria import application_model_storage +from aria import application_model_storage, workflow from aria.orchestrator import context from aria.storage import sql_mapi -from tests import storage as test_storage -from tests.mock import models +from aria.orchestrator.workflows.executor import thread, process + +from tests import storage as test_storage, ROOT_DIR +from ... import mock +from . import execute class TestWorkflowContext(object): @@ -30,10 +33,10 @@ class TestWorkflowContext(object): ctx = self._create_ctx(storage) execution = storage.execution.get(ctx.execution.id) # pylint: disable=no-member assert execution.service == storage.service.get_by_name( - models.SERVICE_NAME) - assert execution.workflow_name == models.WORKFLOW_NAME + mock.models.SERVICE_NAME) + assert execution.workflow_name == mock.models.WORKFLOW_NAME assert execution.service_template == storage.service_template.get_by_name( - models.SERVICE_TEMPLATE_NAME) + mock.models.SERVICE_TEMPLATE_NAME) assert execution.status == storage.execution.model_cls.PENDING assert execution.inputs == {} assert execution.created_at <= datetime.utcnow() @@ -49,27 +52,75 @@ class TestWorkflowContext(object): :param storage: :return WorkflowContext: """ - service = storage.service.get_by_name(models.SERVICE_NAME) + service = storage.service.get_by_name(mock.models.SERVICE_NAME) return context.workflow.WorkflowContext( name='simple_context', model_storage=storage, resource_storage=None, service_id=service, execution_id=storage.execution.list(filters=dict(service=service))[0].id, - workflow_name=models.WORKFLOW_NAME, - task_max_attempts=models.TASK_MAX_ATTEMPTS, - task_retry_interval=models.TASK_RETRY_INTERVAL + workflow_name=mock.models.WORKFLOW_NAME, + task_max_attempts=mock.models.TASK_MAX_ATTEMPTS, + task_retry_interval=mock.models.TASK_RETRY_INTERVAL ) + @pytest.fixture + def storage(self): + workflow_storage = application_model_storage( + sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage) + workflow_storage.service_template.put(mock.models.create_service_template()) + service_template = workflow_storage.service_template.get_by_name( + mock.models.SERVICE_TEMPLATE_NAME) + service = mock.models.create_service(service_template) + workflow_storage.service.put(service) + workflow_storage.execution.put(mock.models.create_execution(service)) + yield workflow_storage + test_storage.release_sqlite_storage(workflow_storage) + + +@pytest.fixture +def ctx(tmpdir): + context = mock.context.simple( + str(tmpdir), + context_kwargs=dict(workdir=str(tmpdir.join('workdir'))) + ) + yield context + test_storage.release_sqlite_storage(context.model) + + +@pytest.fixture(params=[ + (thread.ThreadExecutor, {}), + (process.ProcessExecutor, {'python_path': [ROOT_DIR]}), +]) +def executor(request): + executor_cls, executor_kwargs = request.param + result = executor_cls(**executor_kwargs) + try: + yield result + finally: + result.close() + + +def test_attribute_consumption(ctx, executor): + + node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) + node.attributes['key'] = ctx.model.attribute.model_cls.wrap('key', 'value') + node.attributes['key2'] = ctx.model.attribute.model_cls.wrap('key2', 'value_to_change') + ctx.model.node.update(node) + + assert node.attributes['key'].value == 'value' + assert node.attributes['key2'].value == 'value_to_change' + + @workflow + def basic_workflow(ctx, **_): + node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) + node.attributes['new_key'] = 'new_value' + node.attributes['key2'] = 'changed_value' + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME) -@pytest.fixture(scope='function') -def storage(): - workflow_storage = application_model_storage( - sql_mapi.SQLAlchemyModelAPI, initiator=test_storage.init_inmemory_model_storage) - workflow_storage.service_template.put(models.create_service_template()) - service_template = workflow_storage.service_template.get_by_name(models.SERVICE_TEMPLATE_NAME) - service = models.create_service(service_template) - workflow_storage.service.put(service) - workflow_storage.execution.put(models.create_execution(service)) - yield workflow_storage - test_storage.release_sqlite_storage(workflow_storage) + assert len(node.attributes) == 3 + assert node.attributes['key'].value == 'value' + assert node.attributes['new_key'].value == 'new_value' + assert node.attributes['key2'].value == 'changed_value' http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index d792a57..f667460 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -477,20 +477,22 @@ if __name__ == '__main__': 'input_as_env_var': env_var }) + node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface = mock.models.create_interface( + node.service, + 'test', + 'op', + operation_kwargs=dict( + function='{0}.{1}'.format( + operations.__name__, + operations.run_script_locally.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + workflow_context.model.node.update(node) + @workflow def mock_workflow(ctx, graph): - node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - interface = mock.models.create_interface( - node.service, - 'test', - 'op', - operation_kwargs=dict( - function='{0}.{1}'.format( - operations.__name__, - operations.run_script_locally.__name__), - arguments=arguments) - ) - node.interfaces[interface.name] = interface graph.add_tasks(api.task.OperationTask( node, interface_name='test', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index 8b326e7..8c4dd2d 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -214,33 +214,33 @@ class TestWithActualSSHServer(object): else: operation = operations.run_script_with_ssh + node = self._workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + arguments = { + 'script_path': script_path, + 'fabric_env': _FABRIC_ENV, + 'process': process, + 'use_sudo': use_sudo, + 'custom_env_var': custom_input, + 'test_operation': '', + } + if hide_output: + arguments['hide_output'] = hide_output + if commands: + arguments['commands'] = commands + interface = mock.models.create_interface( + node.service, + 'test', + 'op', + operation_kwargs=dict( + function='{0}.{1}'.format( + operations.__name__, + operation.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + @workflow def mock_workflow(ctx, graph): - node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - arguments = { - 'script_path': script_path, - 'fabric_env': _FABRIC_ENV, - 'process': process, - 'use_sudo': use_sudo, - 'custom_env_var': custom_input, - 'test_operation': '', - } - if hide_output: - arguments['hide_output'] = hide_output - if commands: - arguments['commands'] = commands - interface = mock.models.create_interface( - node.service, - 'test', - 'op', - operation_kwargs=dict( - function='{0}.{1}'.format( - operations.__name__, - operation.__name__), - arguments=arguments) - ) - node.interfaces[interface.name] = interface - ops = [] for test_operation in test_operations: op_arguments = arguments.copy() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/workflows/builtin/test_execute_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/builtin/test_execute_operation.py b/tests/orchestrator/workflows/builtin/test_execute_operation.py index 88818ca..8713e3c 100644 --- a/tests/orchestrator/workflows/builtin/test_execute_operation.py +++ b/tests/orchestrator/workflows/builtin/test_execute_operation.py @@ -56,12 +56,9 @@ def test_execute_operation(ctx): ) assert len(execute_tasks) == 1 - assert execute_tasks[0].name == task.OperationTask.NAME_FORMAT.format( - type='node', - name=node.name, - interface=interface_name, - operation=operation_name - ) + assert getattr(execute_tasks[0].actor, '_wrapped', execute_tasks[0].actor) == node + assert execute_tasks[0].operation_name == operation_name + assert execute_tasks[0].interface_name == interface_name # TODO: add more scenarios http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index 6d2836c..0438544 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -55,12 +55,7 @@ class BaseTest(object): tasks_graph=graph) @staticmethod - def _op(ctx, - func, - arguments=None, - max_attempts=None, - retry_interval=None, - ignore_failure=None): + def _create_interface(ctx, func, arguments=None): node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) interface_name = 'aria.interfaces.lifecycle' operation_kwargs = dict(function='{name}.{func.__name__}'.format( @@ -72,6 +67,17 @@ class BaseTest(object): interface = mock.models.create_interface(node.service, interface_name, operation_name, operation_kwargs=operation_kwargs) node.interfaces[interface.name] = interface + ctx.model.node.update(node) + + return node, interface_name, operation_name + + @staticmethod + def _op(node, + operation_name, + arguments=None, + max_attempts=None, + retry_interval=None, + ignore_failure=None): return api.task.OperationTask( node, @@ -158,9 +164,11 @@ class TestEngine(BaseTest): assert execution.status == models.Execution.SUCCEEDED def test_single_task_successful_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface(workflow_context, mock_success_task) + @workflow def mock_workflow(ctx, graph): - graph.add_tasks(self._op(ctx, func=mock_success_task)) + graph.add_tasks(self._op(node, operation_name)) self._execute( workflow_func=mock_workflow, workflow_context=workflow_context, @@ -170,9 +178,11 @@ class TestEngine(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 1 def test_single_task_failed_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface(workflow_context, mock_failed_task) + @workflow def mock_workflow(ctx, graph): - graph.add_tasks(self._op(ctx, func=mock_failed_task)) + graph.add_tasks(self._op(node, operation_name)) with pytest.raises(exceptions.ExecutorException): self._execute( workflow_func=mock_workflow, @@ -187,10 +197,13 @@ class TestEngine(BaseTest): assert execution.status == models.Execution.FAILED def test_two_tasks_execution_order(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_ordered_task, {'counter': 1}) + @workflow def mock_workflow(ctx, graph): - op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) - op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) + op1 = self._op(node, operation_name, arguments={'counter': 1}) + op2 = self._op(node, operation_name, arguments={'counter': 2}) graph.sequence(op1, op2) self._execute( workflow_func=mock_workflow, @@ -202,11 +215,14 @@ class TestEngine(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_stub_and_subworkflow_execution(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_ordered_task, {'counter': 1}) + @workflow def sub_workflow(ctx, graph): - op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1}) + op1 = self._op(node, operation_name, arguments={'counter': 1}) op2 = api.task.StubTask() - op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2}) + op3 = self._op(node, operation_name, arguments={'counter': 2}) graph.sequence(op1, op2, op3) @workflow @@ -225,11 +241,13 @@ class TestCancel(BaseTest): def test_cancel_started_execution(self, workflow_context, executor): number_of_tasks = 100 + node, _, operation_name = self._create_interface( + workflow_context, mock_sleep_task, {'seconds': 0.1}) @workflow def mock_workflow(ctx, graph): operations = ( - self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1)) + self._op(node, operation_name, arguments=dict(seconds=0.1)) for _ in range(number_of_tasks) ) return graph.sequence(*operations) @@ -267,9 +285,12 @@ class TestCancel(BaseTest): class TestRetries(BaseTest): def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 1}, max_attempts=2) graph.add_tasks(op) @@ -283,9 +304,12 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 2}, max_attempts=2) graph.add_tasks(op) @@ -300,9 +324,11 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 1}, max_attempts=3) graph.add_tasks(op) @@ -316,9 +342,12 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) + @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 2}, max_attempts=3) graph.add_tasks(op) @@ -332,9 +361,11 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 3 def test_infinite_retries(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 1}, max_attempts=-1) graph.add_tasks(op) @@ -358,9 +389,11 @@ class TestRetries(BaseTest): executor=executor) def _test_retry_interval(self, retry_interval, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, arguments={'failure_count': 1}, max_attempts=2, retry_interval=retry_interval) @@ -378,9 +411,11 @@ class TestRetries(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_ignore_failure(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_conditional_failure_task, {'failure_count': 1}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_conditional_failure_task, + op = self._op(node, operation_name, ignore_failure=True, arguments={'failure_count': 100}, max_attempts=100) @@ -401,10 +436,12 @@ class TestTaskRetryAndAbort(BaseTest): def test_task_retry_default_interval(self, workflow_context, executor): default_retry_interval = 0.1 + node, _, operation_name = self._create_interface( + workflow_context, mock_task_retry, {'message': self.message}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_task_retry, + op = self._op(node, operation_name, arguments={'message': self.message}, retry_interval=default_retry_interval, max_attempts=2) @@ -425,10 +462,13 @@ class TestTaskRetryAndAbort(BaseTest): def test_task_retry_custom_interval(self, workflow_context, executor): default_retry_interval = 100 custom_retry_interval = 0.1 + node, _, operation_name = self._create_interface( + workflow_context, mock_task_retry, {'message': self.message, + 'retry_interval': custom_retry_interval}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_task_retry, + op = self._op(node, operation_name, arguments={'message': self.message, 'retry_interval': custom_retry_interval}, retry_interval=default_retry_interval, @@ -449,9 +489,11 @@ class TestTaskRetryAndAbort(BaseTest): assert global_test_holder.get('sent_task_signal_calls') == 2 def test_task_abort(self, workflow_context, executor): + node, _, operation_name = self._create_interface( + workflow_context, mock_task_abort, {'message': self.message}) @workflow def mock_workflow(ctx, graph): - op = self._op(ctx, func=mock_task_abort, + op = self._op(node, operation_name, arguments={'message': self.message}, retry_interval=100, max_attempts=100) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index 7969457..5f0b75f 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -32,19 +32,23 @@ def test_decorate_extension(context, executor): def get_node(ctx): return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + node = get_node(context) + interface_name = 'test_interface' + operation_name = 'operation' + interface = mock.models.create_interface( + context.service, + interface_name, + operation_name, + operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__), + arguments=arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + + @workflow def mock_workflow(ctx, graph): node = get_node(ctx) - interface_name = 'test_interface' - operation_name = 'operation' - interface = mock.models.create_interface( - ctx.service, - interface_name, - operation_name, - operation_kwargs=dict(function='{0}.{1}'.format(__name__, _mock_operation.__name__), - arguments=arguments) - ) - node.interfaces[interface.name] = interface task = api.task.OperationTask( node, interface_name=interface_name, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 2d80a3b..7dbcc5a 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -83,20 +83,22 @@ def test_apply_tracked_changes_during_an_operation(context, executor): def _run_workflow(context, executor, op_func, arguments=None): + node = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) + interface_name = 'test_interface' + operation_name = 'operation' + wf_arguments = arguments or {} + interface = mock.models.create_interface( + context.service, + interface_name, + operation_name, + operation_kwargs=dict(function=_operation_mapping(op_func), + arguments=wf_arguments) + ) + node.interfaces[interface.name] = interface + context.model.node.update(node) + @workflow def mock_workflow(ctx, graph): - node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME) - interface_name = 'test_interface' - operation_name = 'operation' - wf_arguments = arguments or {} - interface = mock.models.create_interface( - ctx.service, - interface_name, - operation_name, - operation_kwargs=dict(function=_operation_mapping(op_func), - arguments=wf_arguments) - ) - node.interfaces[interface.name] = interface task = api.task.OperationTask( node, interface_name=interface_name, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/2149a5ee/tests/storage/test_collection_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_collection_instrumentation.py b/tests/storage/test_collection_instrumentation.py new file mode 100644 index 0000000..e915421 --- /dev/null +++ b/tests/storage/test_collection_instrumentation.py @@ -0,0 +1,257 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria.modeling import models +from aria.storage import collection_instrumentation + + +class MockActor(object): + def __init__(self): + self.dict_ = {} + self.list_ = [] + + +class MockMAPI(object): + + def __init__(self): + pass + + def put(self, *args, **kwargs): + pass + + def update(self, *args, **kwargs): + pass + + +class CollectionInstrumentation(object): + + @pytest.fixture + def actor(self): + return MockActor() + + @pytest.fixture + def model(self): + return MockMAPI() + + @pytest.fixture + def dict_(self, actor, model): + return collection_instrumentation._InstrumentedDict(model, actor, 'dict_', models.Attribute) + + @pytest.fixture + def list_(self, actor, model): + return collection_instrumentation._InstrumentedList(model, actor, 'list_', models.Attribute) + + +class TestDict(CollectionInstrumentation): + + def test_keys(self, actor, dict_): + dict_.update( + { + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key2', 'value2') + } + ) + assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) + + def test_values(self, actor, dict_): + dict_.update({ + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') + }) + assert (sorted(dict_.values()) == + sorted(['value1', 'value2']) == + sorted(v.value for v in actor.dict_.values())) + + def test_items(self, dict_): + dict_.update({ + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') + }) + assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')]) + + def test_iter(self, actor, dict_): + dict_.update({ + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') + }) + assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys()) + + def test_bool(self, dict_): + assert not dict_ + dict_.update({ + 'key1': models.Attribute.wrap('key1', 'value1'), + 'key2': models.Attribute.wrap('key1', 'value2') + }) + assert dict_ + + def test_set_item(self, actor, dict_): + dict_['key1'] = models.Attribute.wrap('key1', 'value1') + assert dict_['key1'] == 'value1' == actor.dict_['key1'].value + assert isinstance(actor.dict_['key1'], models.Attribute) + + def test_nested(self, actor, dict_): + dict_['key'] = {} + assert isinstance(actor.dict_['key'], models.Attribute) + assert dict_['key'] == actor.dict_['key'].value == {} + + dict_['key']['inner_key'] = 'value' + + assert len(dict_) == 1 + assert 'inner_key' in dict_['key'] + assert dict_['key']['inner_key'] == 'value' + assert dict_['key'].keys() == ['inner_key'] + assert dict_['key'].values() == ['value'] + assert dict_['key'].items() == [('inner_key', 'value')] + assert isinstance(actor.dict_['key'], models.Attribute) + assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) + + dict_['key'].update({'updated_key': 'updated_value'}) + assert len(dict_) == 1 + assert 'updated_key' in dict_['key'] + assert dict_['key']['updated_key'] == 'updated_value' + assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key']) + assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value']) + assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'), + ('updated_key', 'updated_value')]) + assert isinstance(actor.dict_['key'], models.Attribute) + assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict) + + dict_.update({'key': 'override_value'}) + assert len(dict_) == 1 + assert 'key' in dict_ + assert dict_['key'] == 'override_value' + assert len(actor.dict_) == 1 + assert isinstance(actor.dict_['key'], models.Attribute) + assert actor.dict_['key'].value == 'override_value' + + def test_get_item(self, actor, dict_): + dict_['key1'] = models.Attribute.wrap('key1', 'value1') + assert isinstance(actor.dict_['key1'], models.Attribute) + + def test_update(self, actor, dict_): + dict_['key1'] = 'value1' + + new_dict = {'key2': 'value2'} + dict_.update(new_dict) + assert len(dict_) == 2 + assert dict_['key2'] == 'value2' + assert isinstance(actor.dict_['key2'], models.Attribute) + + new_dict = {} + new_dict.update(dict_) + assert new_dict['key1'] == dict_['key1'] + + def test_copy(self, dict_): + dict_['key1'] = 'value1' + + new_dict = dict_.copy() + assert new_dict is not dict_ + assert new_dict == dict_ + + dict_['key1'] = 'value2' + assert new_dict['key1'] == 'value1' + assert dict_['key1'] == 'value2' + + def test_clear(self, dict_): + dict_['key1'] = 'value1' + dict_.clear() + + assert len(dict_) == 0 + + +class TestList(CollectionInstrumentation): + + def test_append(self, actor, list_): + list_.append(models.Attribute.wrap('name', 'value1')) + list_.append('value2') + assert len(actor.list_) == 2 + assert len(list_) == 2 + assert isinstance(actor.list_[0], models.Attribute) + assert list_[0] == 'value1' + + assert isinstance(actor.list_[1], models.Attribute) + assert list_[1] == 'value2' + + list_[0] = 'new_value1' + list_[1] = 'new_value2' + assert isinstance(actor.list_[1], models.Attribute) + assert isinstance(actor.list_[1], models.Attribute) + assert list_[0] == 'new_value1' + assert list_[1] == 'new_value2' + + def test_iter(self, list_): + list_.append('value1') + list_.append('value2') + assert sorted(list_) == sorted(['value1', 'value2']) + + def test_insert(self, actor, list_): + list_.append('value1') + list_.insert(0, 'value2') + list_.insert(2, 'value3') + list_.insert(10, 'value4') + assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4']) + assert len(actor.list_) == 4 + + def test_set(self, list_): + list_.append('value1') + list_.append('value2') + + list_[1] = 'value3' + assert len(list_) == 2 + assert sorted(list_) == sorted(['value1', 'value3']) + + def test_insert_into_nested(self, actor, list_): + list_.append([]) + + list_[0].append('inner_item') + assert isinstance(actor.list_[0], models.Attribute) + assert len(list_) == 1 + assert list_[0][0] == 'inner_item' + + list_[0].append('new_item') + assert isinstance(actor.list_[0], models.Attribute) + assert len(list_) == 1 + assert list_[0][1] == 'new_item' + + assert list_[0] == ['inner_item', 'new_item'] + assert ['inner_item', 'new_item'] == list_[0] + + +class TestDictList(CollectionInstrumentation): + def test_dict_in_list(self, actor, list_): + list_.append({}) + assert len(list_) == 1 + assert isinstance(actor.list_[0], models.Attribute) + assert actor.list_[0].value == {} + + list_[0]['key'] = 'value' + assert list_[0]['key'] == 'value' + assert len(actor.list_) == 1 + assert isinstance(actor.list_[0], models.Attribute) + assert actor.list_[0].value['key'] == 'value' + + def test_list_in_dict(self, actor, dict_): + dict_['key'] = [] + assert len(dict_) == 1 + assert isinstance(actor.dict_['key'], models.Attribute) + assert actor.dict_['key'].value == [] + + dict_['key'].append('value') + assert dict_['key'][0] == 'value' + assert len(actor.dict_) == 1 + assert isinstance(actor.dict_['key'], models.Attribute) + assert actor.dict_['key'].value[0] == 'value'