Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E1B1F200BC0 for ; Tue, 15 Nov 2016 16:46:12 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id E0111160B03; Tue, 15 Nov 2016 15:46:12 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id A009A160AF2 for ; Tue, 15 Nov 2016 16:46:10 +0100 (CET) Received: (qmail 79953 invoked by uid 500); 15 Nov 2016 15:46:09 -0000 Mailing-List: contact dev-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list dev@ariatosca.incubator.apache.org Received: (qmail 79942 invoked by uid 99); 15 Nov 2016 15:46:09 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 15:46:09 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 4257D180058 for ; Tue, 15 Nov 2016 15:46:09 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.117 X-Spam-Level: X-Spam-Status: No, score=-4.117 tagged_above=-999 required=6.31 tests=[HK_RANDOM_FROM=0.999, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, TRACKER_ID=1.102, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id Hgahb9tMux7T for ; Tue, 15 Nov 2016 15:45:57 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 3EC105F472 for ; Tue, 15 Nov 2016 15:45:55 +0000 (UTC) Received: (qmail 75230 invoked by uid 99); 15 Nov 2016 15:45:54 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 15:45:54 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 4EA85E0AF6; Tue, 15 Nov 2016 15:45:54 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxmrlv@apache.org To: dev@ariatosca.incubator.apache.org Date: Tue, 15 Nov 2016 15:45:55 -0000 Message-Id: In-Reply-To: <8ebd5886d7d64fbb841843704d59065f@git.apache.org> References: <8ebd5886d7d64fbb841843704d59065f@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] incubator-ariatosca git commit: ARIATOSCA-9-API-for-operation-context archived-at: Tue, 15 Nov 2016 15:46:13 -0000 ARIATOSCA-9-API-for-operation-context An API for operation context. users can now use operation context in their operations. Additional changes: - Inroduced the toolbelt for the operation. providing with useful function working within the operation. - Added a mechanism which simplifies the use of user based Tasks. Using the node_instnace/relationship_instance. - Added raising exceptions when trying to access a non existing resource on the storage. Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5956f889 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5956f889 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5956f889 Branch: refs/heads/ARIA-9-API-for-operation-context Commit: 5956f88995f13bcaab0001dfb64a886fd350e1c8 Parents: b9c135f Author: mxmrlv Authored: Thu Oct 27 20:39:09 2016 +0300 Committer: mxmrlv Committed: Tue Nov 15 17:44:14 2016 +0200 ---------------------------------------------------------------------- aria/.pylintrc | 2 +- aria/context/__init__.py | 1 + aria/context/common.py | 147 ++++++++++++++++ aria/context/operation.py | 123 +++++++++---- aria/context/toolbelt.py | 75 ++++++++ aria/context/workflow.py | 126 ++------------ aria/decorators.py | 36 ++-- aria/events/builtin_event_handler.py | 8 +- aria/storage/drivers.py | 10 +- aria/storage/models.py | 7 +- aria/storage/structures.py | 1 - aria/workflows/api/task.py | 74 ++++++-- aria/workflows/builtin/heal.py | 10 +- aria/workflows/builtin/workflows.py | 137 ++++++--------- aria/workflows/core/__init__.py | 2 +- aria/workflows/core/task.py | 107 +++++++----- aria/workflows/executor/__init__.py | 7 + aria/workflows/executor/blocking.py | 4 +- aria/workflows/executor/celery.py | 6 +- aria/workflows/executor/multiprocess.py | 9 +- aria/workflows/executor/thread.py | 4 +- tests/.pylintrc | 2 +- tests/context/__init__.py | 21 +++ tests/context/test_operation.py | 156 +++++++++++++++++ tests/context/test_toolbelt.py | 171 +++++++++++++++++++ tests/mock/context.py | 1 + tests/mock/models.py | 48 ++++-- tests/storage/test_models.py | 7 +- tests/storage/test_resource_storage.py | 12 ++ tests/workflows/api/test_task.py | 78 ++++++--- .../workflows/builtin/test_execute_operation.py | 2 +- tests/workflows/core/test_engine.py | 57 ++++--- tests/workflows/core/test_task.py | 113 ++++++++++++ .../test_task_graph_into_exececution_graph.py | 18 +- tests/workflows/executor/test_executor.py | 13 +- 35 files changed, 1184 insertions(+), 411 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/.pylintrc ---------------------------------------------------------------------- diff --git a/aria/.pylintrc b/aria/.pylintrc index e5ee9de..bf90513 100644 --- a/aria/.pylintrc +++ b/aria/.pylintrc @@ -77,7 +77,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,logging-format-interpolation,import-error +disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,logging-format-interpolation,import-error,protected-access [REPORTS] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/context/__init__.py ---------------------------------------------------------------------- diff --git a/aria/context/__init__.py b/aria/context/__init__.py index 20e19db..ad89b13 100644 --- a/aria/context/__init__.py +++ b/aria/context/__init__.py @@ -18,3 +18,4 @@ Provides contexts to workflow and operation """ from . import workflow, operation +from .toolbelt import toolbelt http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/context/common.py ---------------------------------------------------------------------- diff --git a/aria/context/common.py b/aria/context/common.py new file mode 100644 index 0000000..6e9b86a --- /dev/null +++ b/aria/context/common.py @@ -0,0 +1,147 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +A common context for both workflow and operation +""" +from uuid import uuid4 + +from .. import ( + logger, + exceptions, +) +from ..tools.lru_cache import lru_cache + + +class BaseContext(logger.LoggerMixin): + """ + Base context object for workflow and operation + """ + + def __init__( + self, + name, + model_storage, + resource_storage, + deployment_id, + workflow_id, + execution_id=None, + task_max_attempts=1, + task_retry_interval=0, + task_ignore_failure=False, + **kwargs): + super(BaseContext, self).__init__(**kwargs) + self._name = name + self._id = str(uuid4()) + self._model = model_storage + self._resource = resource_storage + self._deployment_id = deployment_id + self._workflow_id = workflow_id + self._execution_id = execution_id or str(uuid4()) + self._task_max_attempts = task_max_attempts + self._task_retry_interval = task_retry_interval + self._task_ignore_failure = task_ignore_failure + + def __repr__(self): + return ( + '{name}(name={self.name}, ' + 'deployment_id={self._deployment_id}, ' + 'workflow_id={self._workflow_id}, ' + 'execution_id={self._execution_id})' + .format(name=self.__class__.__name__, self=self)) + + @property + def model(self): + """ + Access to the model storage + :return: + """ + return self._model + + @property + def resource(self): + """ + Access to the resource storage + :return: + """ + return self._resource + + @property + @lru_cache() + def blueprint(self): + """ + The blueprint model + """ + return self.model.blueprint.get(self.deployment.blueprint_id) + + @property + @lru_cache() + def deployment(self): + """ + The deployment model + """ + return self.model.deployment.get(self._deployment_id) + + @property + def execution(self): + """ + The execution model + """ + return self.model.execution.get(self._execution_id) + + @execution.setter + def execution(self, value): + """ + Store the execution in the model storage + """ + self.model.execution.store(value) + + @property + def name(self): + """ + The operation name + :return: + """ + return self._name + + @property + def id(self): + """ + The operation id + :return: + """ + return self._id + + def download_resource(self, destination, path=None): + """ + Download a blueprint resource from the resource storage + """ + try: + return self.resource.deployment.download(entry_id=self.deployment.id, + destination=destination, + path=path) + except exceptions.StorageError: + return self.resource.blueprint.download(entry_id=self.blueprint.id, + destination=destination, + path=path) + + @lru_cache() + def get_resource(self, path=None): + """ + Read a deployment resource as string from the resource storage + """ + try: + return self.resource.deployment.data(entry_id=self.deployment.id, path=path) + except exceptions.StorageError: + return self.resource.blueprint.data(entry_id=self.blueprint.id, path=path) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/context/operation.py b/aria/context/operation.py index d4d229a..bf3686d 100644 --- a/aria/context/operation.py +++ b/aria/context/operation.py @@ -17,52 +17,111 @@ Workflow and operation contexts """ -from uuid import uuid4 -from aria.logger import LoggerMixin +from .common import BaseContext -class OperationContext(LoggerMixin): + +class BaseOperationContext(BaseContext): """ Context object used during operation creation and execution """ - def __init__( - self, - name, - operation_details, - workflow_context, - node_instance, - inputs=None): - super(OperationContext, self).__init__() - self.name = name - self.id = str(uuid4()) - self.operation_details = operation_details - self.workflow_context = workflow_context - self.node_instance = node_instance - self.inputs = inputs or {} + def __init__(self, name, workflow_context, task, **kwargs): + super(BaseOperationContext, self).__init__( + name=name, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + deployment_id=workflow_context._deployment_id, + workflow_id=workflow_context._workflow_id, + execution_id=workflow_context._execution_id, + **kwargs) + self._task_model = task + self._actor = self.task.actor def __repr__(self): - details = ', '.join( - '{0}={1}'.format(key, value) - for key, value in self.operation_details.items()) + details = 'operation_mapping={task.operation_mapping}; ' \ + 'operation_inputs={task.inputs}'\ + .format(task=self.task) return '{name}({0})'.format(details, name=self.name) - def __getattr__(self, attr): - try: - return getattr(self.workflow_context, attr) - except AttributeError: - return super(OperationContext, self).__getattribute__(attr) + @property + def task(self): + """ + The task in the model storage + :return: Task model + """ + return self._task_model + + +class NodeOperationContext(BaseOperationContext): + """ + Context for node based operations. + """ + @property + def node(self): + """ + the node of the current operation + :return: + """ + return self._actor.node + + @property + def node_instance(self): + """ + The node instance of the current operation + :return: + """ + return self._actor + +class RelationshipOperationContext(BaseOperationContext): + """ + Context for relationship based operations. + """ @property - def operation(self): + def source_node(self): """ - The model operation + The source node + :return: """ - return self.storage.operation.get(self.id) + return self.model.node.get(self.relationship.source_id) - @operation.setter - def operation(self, value): + @property + def source_node_instance(self): + """ + The source node instance + :return: + """ + return self.model.node_instance.get(self.relationship_instance.source_id) + + @property + def target_node(self): + """ + The target node + :return: + """ + return self.model.node.get(self.relationship.target_id) + + @property + def target_node_instance(self): + """ + The target node instance + :return: + """ + return self.model.node_instance.get(self._actor.target_id) + + @property + def relationship(self): + """ + The relationship of the current operation + :return: + """ + return self._actor.relationship + + @property + def relationship_instance(self): """ - Store the operation in the model storage + The relationship instance of the current operation + :return: """ - self.storage.operation.store(value) + return self._actor http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/context/toolbelt.py b/aria/context/toolbelt.py new file mode 100644 index 0000000..0aad89c --- /dev/null +++ b/aria/context/toolbelt.py @@ -0,0 +1,75 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" +Provides with different tools for operations. +""" + +from . import operation + + +class NodeToolBelt(object): + """ + Node operation related tool belt + """ + def __init__(self, operation_context): + self._op_context = operation_context + + @property + def dependent_node_instances(self): + """ + Any node instance which has a relationship to the current node instance. + :return: + """ + assert isinstance(self._op_context, operation.NodeOperationContext) + node_instances = self._op_context.model.node_instance.iter( + filters={'deployment_id': self._op_context.deployment.id} + ) + for node_instance in node_instances: + for relationship_instance in node_instance.relationship_instances: + if relationship_instance.target_id == self._op_context.node_instance.id: + yield node_instance + + @property + def host_ip(self): + """ + The host ip of the current node + :return: + """ + assert isinstance(self._op_context, operation.NodeOperationContext) + host_id = self._op_context._actor.host_id + host_instance = self._op_context.model.node_instance.get(host_id) + return host_instance.runtime_properties.get('ip') + + +class RelationshipToolBelt(object): + """ + Relationship operation related tool belt + """ + def __init__(self, operation_context): + self._op_context = operation_context + + +def toolbelt(operation_context): + """ + Get a toolbelt according to the current operation executor + :param operation_context: + :return: + """ + if isinstance(operation_context, operation.NodeOperationContext): + return NodeToolBelt(operation_context) + elif isinstance(operation_context, operation.RelationshipOperationContext): + return RelationshipToolBelt(operation_context) + else: + raise RuntimeError("Operation context not supported") http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/context/workflow.py b/aria/context/workflow.py index b84a5fb..0495bdc 100644 --- a/aria/context/workflow.py +++ b/aria/context/workflow.py @@ -18,12 +18,11 @@ Workflow and operation contexts """ import threading -from uuid import uuid4 from contextlib import contextmanager -from .. import logger -from ..tools.lru_cache import lru_cache -from .. import exceptions +from aria import exceptions + +from .common import BaseContext class ContextException(exceptions.AriaError): @@ -33,145 +32,52 @@ class ContextException(exceptions.AriaError): pass -class WorkflowContext(logger.LoggerMixin): +class WorkflowContext(BaseContext): """ Context object used during workflow creation and execution """ - - def __init__( - self, - name, - model_storage, - resource_storage, - deployment_id, - workflow_id, - execution_id=None, - parameters=None, - task_max_attempts=1, - task_retry_interval=0, - task_ignore_failure=False, - **kwargs): - super(WorkflowContext, self).__init__(**kwargs) - self.name = name - self.id = str(uuid4()) - self.model = model_storage - self.resource = resource_storage - self.deployment_id = deployment_id - self.workflow_id = workflow_id - self.execution_id = execution_id or str(uuid4()) + def __init__(self, parameters=None, *args, **kwargs): + super(WorkflowContext, self).__init__(*args, **kwargs) self.parameters = parameters or {} - self.task_max_attempts = task_max_attempts - self.task_retry_interval = task_retry_interval - self.task_ignore_failure = task_ignore_failure # TODO: execution creation should happen somewhere else # should be moved there, when such logical place exists try: - self.model.execution.get(self.execution_id) + self.model.execution.get(self._execution_id) except exceptions.StorageError: self._create_execution() def __repr__(self): return ( - '{name}(deployment_id={self.deployment_id}, ' - 'workflow_id={self.workflow_id}, ' - 'execution_id={self.execution_id})'.format( + '{name}(deployment_id={self._deployment_id}, ' + 'workflow_id={self._workflow_id}, ' + 'execution_id={self._execution_id})'.format( name=self.__class__.__name__, self=self)) def _create_execution(self): execution_cls = self.model.execution.model_cls execution = self.model.execution.model_cls( - id=self.execution_id, - deployment_id=self.deployment_id, - workflow_id=self.workflow_id, - blueprint_id=self.blueprint_id, + id=self._execution_id, + deployment_id=self.deployment.id, + workflow_id=self._workflow_id, + blueprint_id=self.blueprint.id, status=execution_cls.PENDING, parameters=self.parameters, ) self.model.execution.store(execution) @property - def blueprint_id(self): - """ - The blueprint id - """ - return self.deployment.blueprint_id - - @property - @lru_cache() - def blueprint(self): - """ - The blueprint model - """ - return self.model.blueprint.get(self.blueprint_id) - - @property - @lru_cache() - def deployment(self): - """ - The deployment model - """ - return self.model.deployment.get(self.deployment_id) - - @property def nodes(self): """ Iterator over nodes """ - return self.model.node.iter( - filters={'blueprint_id': self.blueprint_id}) + return self.model.node.iter(filters={'blueprint_id': self.blueprint.id}) @property def node_instances(self): """ Iterator over node instances """ - return self.model.node_instance.iter(filters={'deployment_id': self.deployment_id}) - - @property - def execution(self): - """ - The execution model - """ - return self.model.execution.get(self.execution_id) - - @execution.setter - def execution(self, value): - """ - Store the execution in the model storage - """ - self.model.execution.store(value) - - def download_blueprint_resource(self, destination, path=None): - """ - Download a blueprint resource from the resource storage - """ - return self.resource.blueprint.download( - entry_id=self.blueprint_id, - destination=destination, - path=path) - - def download_deployment_resource(self, destination, path=None): - """ - Download a deployment resource from the resource storage - """ - return self.resource.deployment.download( - entry_id=self.deployment_id, - destination=destination, - path=path) - - @lru_cache() - def get_deployment_resource_data(self, path=None): - """ - Read a deployment resource as string from the resource storage - """ - return self.resource.deployment.data(entry_id=self.deployment_id, path=path) - - @lru_cache() - def get_blueprint_resource_data(self, path=None): - """ - Read a blueprint resource as string from the resource storage - """ - return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path) + return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id}) class _CurrentContext(threading.local): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/decorators.py ---------------------------------------------------------------------- diff --git a/aria/decorators.py b/aria/decorators.py index a07e2ee..8bde0ef 100644 --- a/aria/decorators.py +++ b/aria/decorators.py @@ -25,23 +25,17 @@ from .workflows.api import task_graph from .tools.validation import validate_function_arguments -def workflow( - func=None, - simple_workflow=True, - suffix_template=''): +def workflow(func=None, suffix_template=''): """ Workflow decorator """ if func is None: - return partial( - workflow, - simple_workflow=simple_workflow, - suffix_template=suffix_template) + return partial(workflow, suffix_template=suffix_template) @wraps(func) def _wrapper(ctx, **workflow_parameters): - workflow_name = _generate_workflow_name( + workflow_name = _generate_name( func_name=func.__name__, suffix_template=suffix_template, ctx=ctx, @@ -56,34 +50,24 @@ def workflow( return _wrapper -def operation( - func=None): +def operation(func=None, toolbelt=False, suffix_template=''): """ Operation decorator """ if func is None: - return partial(operation) + return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt) @wraps(func) - def _wrapper(ctx, **custom_kwargs): - func_kwargs = _create_func_kwargs( - custom_kwargs, - ctx) + def _wrapper(**func_kwargs): + if toolbelt: + operation_toolbelt = context.toolbelt(func_kwargs['ctx']) + func_kwargs.setdefault('toolbelt', operation_toolbelt) validate_function_arguments(func, func_kwargs) - ctx.description = func.__doc__ return func(**func_kwargs) return _wrapper -def _generate_workflow_name(func_name, ctx, suffix_template, **custom_kwargs): +def _generate_name(func_name, ctx, suffix_template, **custom_kwargs): return '{func_name}.{suffix}'.format( func_name=func_name, suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4())) - - -def _create_func_kwargs( - kwargs, - ctx, - workflow_name=None): - kwargs.setdefault('graph', ctx.task_graph(workflow_name)) - return kwargs http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/events/builtin_event_handler.py ---------------------------------------------------------------------- diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py index 2f9a3be..c5cccfe 100644 --- a/aria/events/builtin_event_handler.py +++ b/aria/events/builtin_event_handler.py @@ -41,20 +41,20 @@ from . import ( @sent_task_signal.connect def _task_sent(task, *args, **kwargs): - with task.update(): + with task._update(): task.status = task.SENT @start_task_signal.connect def _task_started(task, *args, **kwargs): - with task.update(): + with task._update(): task.started_at = datetime.utcnow() task.status = task.STARTED @on_failure_task_signal.connect def _task_failed(task, *args, **kwargs): - with task.update(): + with task._update(): should_retry = ( (task.retry_count < task.max_attempts - 1 or task.max_attempts == task.INFINITE_RETRIES) and @@ -72,7 +72,7 @@ def _task_failed(task, *args, **kwargs): @on_success_task_signal.connect def _task_succeeded(task, *args, **kwargs): - with task.update(): + with task._update(): task.ended_at = datetime.utcnow() task.status = task.SUCCESS http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/storage/drivers.py ---------------------------------------------------------------------- diff --git a/aria/storage/drivers.py b/aria/storage/drivers.py index 0ad2f49..8b7d3af 100644 --- a/aria/storage/drivers.py +++ b/aria/storage/drivers.py @@ -369,7 +369,10 @@ class FileSystemResourceDriver(ResourceDriver, BaseFileSystemDriver): :return: the content of the file :rtype: bytes """ - resource = os.path.join(self.directory, entry_type, entry_id, path or '') + resource_relative_path = os.path.join(entry_type, entry_id, path or '') + resource = os.path.join(self.directory, resource_relative_path) + if not os.path.exists(resource): + raise StorageError("Resource {0} does not exist".format(resource_relative_path)) if not os.path.isfile(resource): resources = os.listdir(resource) if len(resources) != 1: @@ -387,7 +390,10 @@ class FileSystemResourceDriver(ResourceDriver, BaseFileSystemDriver): :param basestring destination: the destination of the files. :param basestring path: a path on the remote machine relative to the root of the entry. """ - resource = os.path.join(self.directory, entry_type, entry_id, path or '') + resource_relative_path = os.path.join(entry_type, entry_id, path or '') + resource = os.path.join(self.directory, resource_relative_path) + if not os.path.exists(resource): + raise StorageError("Resource {0} does not exist".format(resource_relative_path)) if os.path.isfile(resource): shutil.copy2(resource, destination) else: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/storage/models.py ---------------------------------------------------------------------- diff --git a/aria/storage/models.py b/aria/storage/models.py index 94a9aa0..d24ad75 100644 --- a/aria/storage/models.py +++ b/aria/storage/models.py @@ -241,6 +241,7 @@ class Relationship(Model): A Model which represents a relationship """ id = Field(type=basestring, default=uuid_generator) + source_id = Field(type=basestring) target_id = Field(type=basestring) source_interfaces = Field(type=dict) source_operations = Field(type=dict) @@ -290,6 +291,8 @@ class RelationshipInstance(Model): id = Field(type=basestring, default=uuid_generator) target_id = Field(type=basestring) target_name = Field(type=basestring) + source_id = Field(type=basestring) + source_name = Field(type=basestring) type = Field(type=basestring) relationship = PointerField(type=Relationship) @@ -436,6 +439,6 @@ class Task(Model): # Operation specific fields name = Field(type=basestring) - operation_details = Field(type=dict) - node_instance = PointerField(type=NodeInstance) + operation_mapping = Field(type=basestring) + actor = Field() inputs = Field(type=dict, default=lambda: {}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py index a26e7eb..399922e 100644 --- a/aria/storage/structures.py +++ b/aria/storage/structures.py @@ -26,7 +26,6 @@ classes: * IterPointerField - represents an iterable pointers field. * Model - abstract model implementation. """ - import json from uuid import uuid4 from itertools import count http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py index 7e9b8e0..f6bf996 100644 --- a/aria/workflows/api/task.py +++ b/aria/workflows/api/task.py @@ -18,7 +18,11 @@ Provides the tasks to be entered into the task graph """ from uuid import uuid4 -from ... import context +from ... import ( + context, + storage, + exceptions, +) class BaseTask(object): @@ -54,10 +58,13 @@ class OperationTask(BaseTask): Represents an operation task in the task_graph """ + SOURCE_OPERATION = 'source_operations' + TARGET_OPERATION = 'target_operations' + def __init__(self, name, - operation_details, - node_instance, + actor, + operation_mapping, max_attempts=None, retry_interval=None, ignore_failure=None, @@ -66,21 +73,68 @@ class OperationTask(BaseTask): Creates an operation task using the name, details, node instance and any additional kwargs. :param name: the operation of the name. :param operation_details: the details for the operation. - :param node_instance: the node instance on which this operation is registered. + :param actor: the operation host on which this operation is registered. :param inputs: operation inputs. """ + assert isinstance(actor, (storage.models.NodeInstance, + storage.models.RelationshipInstance)) super(OperationTask, self).__init__() - self.name = name - self.operation_details = operation_details - self.node_instance = node_instance + self.actor = actor + self.name = '{name}.{actor.id}'.format(name=name, actor=actor) + self.operation_mapping = operation_mapping self.inputs = inputs or {} - self.max_attempts = (self.workflow_context.task_max_attempts + self.max_attempts = (self.workflow_context._task_max_attempts if max_attempts is None else max_attempts) - self.retry_interval = (self.workflow_context.task_retry_interval + self.retry_interval = (self.workflow_context._task_retry_interval if retry_interval is None else retry_interval) - self.ignore_failure = (self.workflow_context.task_ignore_failure + self.ignore_failure = (self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure) + @classmethod + def node_instance(cls, instance, name, inputs=None, *args, **kwargs): + """ + Represents a node based operation + + :param instance: the node of which this operation belongs to. + :param name: the name of the operation. + """ + assert isinstance(instance, storage.models.NodeInstance) + operation_details = instance.node.operations[name] + operation_inputs = operation_details.get('inputs', {}) + operation_inputs.update(inputs or {}) + return cls(name=name, + actor=instance, + operation_mapping=operation_details.get('operation', ''), + inputs=operation_inputs, + *args, + **kwargs) + + @classmethod + def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs): + """ + Represents a relationship based operation + + :param instance: the relationship of which this operation belongs to. + :param name: the name of the operation. + :param operation_end: source or target end of the relationship, this corresponds directly + with 'source_operations' and 'target_operations' + :param inputs any additional inputs to the operation + """ + assert isinstance(instance, storage.models.RelationshipInstance) + if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]: + raise exceptions.TaskException('The operation end should be {0} or {1}'.format( + cls.TARGET_OPERATION, cls.SOURCE_OPERATION + )) + operation_details = getattr(instance.relationship, operation_end)[name] + operation_inputs = operation_details.get('inputs', {}) + operation_inputs.update(inputs or {}) + return cls(actor=instance, + name=name, + operation_mapping=operation_details.get('operation'), + inputs=operation_inputs, + *args, + **kwargs) + class WorkflowTask(BaseTask): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/heal.py b/aria/workflows/builtin/heal.py index dc320dc..dbfc14e 100644 --- a/aria/workflows/builtin/heal.py +++ b/aria/workflows/builtin/heal.py @@ -99,12 +99,9 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): if target_node_instance in failing_node_instances: dependency = relationship_tasks( - graph=graph, - node_instance=node_instance, relationship_instance=relationship_instance, - context=ctx, operation_name='aria.interfaces.relationship_lifecycle.unlink') - + graph.add_tasks(*dependency) graph.add_dependency(node_instance_sub_workflow, dependency) @@ -154,12 +151,9 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): if target_node_instance in failing_node_instances: dependent = relationship_tasks( - graph=graph, - node_instance=node_instance, relationship_instance=relationship_instance, - context=ctx, operation_name='aria.interfaces.relationship_lifecycle.establish') - + graph.add_tasks(*dependent) graph.add_dependency(dependent, node_instance_sub_workflow) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/workflows/builtin/workflows.py b/aria/workflows/builtin/workflows.py index fc54f75..0eb8c34 100644 --- a/aria/workflows/builtin/workflows.py +++ b/aria/workflows/builtin/workflows.py @@ -34,7 +34,7 @@ __all__ = ( # Install node instance workflow and sub workflows @workflow(suffix_template='{node_instance.id}') -def install_node_instance(ctx, graph, node_instance): +def install_node_instance(graph, node_instance, **kwargs): """ A workflow which installs a node instance. :param WorkflowContext ctx: the workflow context @@ -42,35 +42,30 @@ def install_node_instance(ctx, graph, node_instance): :param node_instance: the node instance to install :return: """ - create_node_instance = task.OperationTask( - name='aria.interfaces.lifecycle.create.{0}'.format(node_instance.id), - operation_details=node_instance.node.operations['aria.interfaces.lifecycle.create'], - node_instance=node_instance - ) - configure_node_instance = task.OperationTask( - name='aria.interfaces.lifecycle.configure.{0}'.format(node_instance.id), - operation_details=node_instance.node.operations['aria.interfaces.lifecycle.configure'], - node_instance=node_instance - ) - start_node_instance = task.OperationTask( - name='aria.interfaces.lifecycle.start.{0}'.format(node_instance.id), - operation_details=node_instance.node.operations['aria.interfaces.lifecycle.start'], - node_instance=node_instance - ) + create_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.create') + + configure_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.configure') + start_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.start') graph.sequence( create_node_instance, - preconfigure_relationship(graph, ctx, node_instance), + preconfigure_relationship(graph, node_instance), configure_node_instance, - postconfigure_relationship(graph, ctx, node_instance), + postconfigure_relationship(graph, node_instance), start_node_instance, - establish_relationship(graph, ctx, node_instance) + establish_relationship(graph, node_instance) ) return graph -def preconfigure_relationship(graph, ctx, node_instance): +def preconfigure_relationship(graph, node_instance, **kwargs): """ :param context: @@ -81,11 +76,10 @@ def preconfigure_relationship(graph, ctx, node_instance): return relationships_tasks( graph=graph, operation_name='aria.interfaces.relationship_lifecycle.preconfigure', - context=ctx, node_instance=node_instance) -def postconfigure_relationship(graph, ctx, node_instance): +def postconfigure_relationship(graph, node_instance, **kwargs): """ :param context: @@ -96,11 +90,10 @@ def postconfigure_relationship(graph, ctx, node_instance): return relationships_tasks( graph=graph, operation_name='aria.interfaces.relationship_lifecycle.postconfigure', - context=ctx, node_instance=node_instance) -def establish_relationship(graph, ctx, node_instance): +def establish_relationship(graph, node_instance, **kwargs): """ :param context: @@ -111,40 +104,35 @@ def establish_relationship(graph, ctx, node_instance): return relationships_tasks( graph=graph, operation_name='aria.interfaces.relationship_lifecycle.establish', - context=ctx, node_instance=node_instance) # Uninstall node instance workflow and subworkflows @workflow(suffix_template='{node_instance.id}') -def uninstall_node_instance(ctx, graph, node_instance): +def uninstall_node_instance(graph, node_instance, **kwargs): """ - A workflow which uninstalls a node instance. - :param WorkflowContext context: the workflow context - :param TaskGraph graph: the tasks graph of which to edit - :param node_instance: the node instance to uninstall - :return: - """ - stop_node_instance = task.OperationTask( - name='aria.interfaces.lifecycle.stop.{0}'.format(node_instance.id), - operation_details=node_instance.node.operations['aria.interfaces.lifecycle.stop'], - node_instance=node_instance - ) - delete_node_instance = task.OperationTask( - name='aria.interfaces.lifecycle.delete.{0}'.format(node_instance.id), - operation_details=node_instance.node.operations['aria.interfaces.lifecycle.delete'], - node_instance=node_instance - ) + A workflow which uninstalls a node instance. + :param WorkflowContext context: the workflow context + :param TaskGraph graph: the tasks graph of which to edit + :param node_instance: the node instance to uninstall + :return: + """ + stop_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.stop') + delete_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.delete') graph.sequence( stop_node_instance, - unlink_relationship(graph, ctx, node_instance), + unlink_relationship(graph, node_instance), delete_node_instance ) -def unlink_relationship(graph, ctx, node_instance): +def unlink_relationship(graph, node_instance): """ :param context: @@ -155,7 +143,6 @@ def unlink_relationship(graph, ctx, node_instance): return relationships_tasks( graph=graph, operation_name='aria.interfaces.relationship_lifecycle.unlink', - context=ctx, node_instance=node_instance ) @@ -167,8 +154,6 @@ def execute_operation_on_instance( allow_kwargs_override): """ A workflow which executes a single operation - :param WorkflowContext context: the workflow to execute. - :param TaskGraph graph: the tasks graph of which to edit :param node_instance: the node instance to install :param basestring operation: the operation name :param dict operation_kwargs: @@ -179,22 +164,13 @@ def execute_operation_on_instance( if allow_kwargs_override is not None: operation_kwargs['allow_kwargs_override'] = allow_kwargs_override - task_name = '{node_instance.id}.{operation_name}'.format( - node_instance=node_instance, - operation_name=operation) - - return task.OperationTask( - name=task_name, - operation_details=node_instance.node.operations[operation], - node_instance=node_instance, + return task.OperationTask.node_instance( + instance=node_instance, + name=operation, inputs=operation_kwargs) - -def relationships_tasks(graph, - operation_name, - context, - node_instance): +def relationships_tasks(graph, operation_name, node_instance): """ Creates a relationship task (source and target) for all of a node_instance relationships. :param basestring operation_name: the relationship operation name. @@ -207,26 +183,17 @@ def relationships_tasks(graph, key=lambda relationship_instance: relationship_instance.relationship.target_id) sub_tasks = [] - for index, (_, relationship_group) in enumerate(relationships_groups): + for _, (_, relationship_group) in enumerate(relationships_groups): for relationship_instance in relationship_group: relationship_operations = relationship_tasks( - graph=graph, - node_instance=node_instance, relationship_instance=relationship_instance, - context=context, - operation_name=operation_name, - index=index) + operation_name=operation_name) sub_tasks.append(relationship_operations) return graph.sequence(*sub_tasks) -def relationship_tasks(graph, - node_instance, - relationship_instance, - context, - operation_name, - index=None): +def relationship_tasks(relationship_instance, operation_name): """ Creates a relationship task source and target. :param NodeInstance node_instance: the node instance of the relationship @@ -236,21 +203,13 @@ def relationship_tasks(graph, :param index: the relationship index - enables pretty print :return: """ - index = index or node_instance.relationship_instances.index(relationship_instance) - operation_name_template = '{name}.{index}.{{0}}.<{source_id}, {target_id}>'.format( + source_operation = task.OperationTask.relationship_instance( + instance=relationship_instance, name=operation_name, - index=index, - source_id=node_instance.id, - target_id=relationship_instance.target_id, - ) - source_operation = task.OperationTask( - name=operation_name_template.format('source'), - node_instance=node_instance, - operation_details=relationship_instance.relationship.source_operations[ - operation_name]) - target_operation = task.OperationTask( - name=operation_name_template.format('target'), - node_instance=context.model.node_instance.get(relationship_instance.target_id), - operation_details=relationship_instance.relationship.target_operations[ - operation_name]) - return graph.add_tasks(source_operation, target_operation) + operation_end=task.OperationTask.SOURCE_OPERATION) + target_operation = task.OperationTask.relationship_instance( + instance=relationship_instance, + name=operation_name, + operation_end=task.OperationTask.TARGET_OPERATION) + + return source_operation, target_operation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/__init__.py b/aria/workflows/core/__init__.py index 646f44a..e377153 100644 --- a/aria/workflows/core/__init__.py +++ b/aria/workflows/core/__init__.py @@ -17,4 +17,4 @@ Core for the workflow execution mechanism """ -from . import task +from . import task, translation, engine http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py index 65a8ddb..9ab5697 100644 --- a/aria/workflows/core/task.py +++ b/aria/workflows/core/task.py @@ -18,12 +18,29 @@ Workflow tasks """ from contextlib import contextmanager from datetime import datetime +from functools import ( + partial, + wraps, +) from ... import logger from ...storage import models +from ...context import operation as operation_context from .. import exceptions +def _locked(func=None): + if func is None: + return partial(_locked, func=_locked) + + @wraps(func) + def _wrapper(self, value, **kwargs): + if self._update_fields is None: + raise exceptions.TaskException("Task is not in update mode") + return func(self, value, **kwargs) + return _wrapper + + class BaseTask(logger.LoggerMixin): """ Base class for Task objects @@ -80,32 +97,45 @@ class EndSubWorkflowTask(StubTask): pass -class OperationTask(BaseTask, logger.LoggerMixin): +class OperationTask(BaseTask): """ Operation tasks """ def __init__(self, api_task, *args, **kwargs): super(OperationTask, self).__init__(id=api_task.id, **kwargs) - self._workflow_ctx = api_task.workflow_context - task_model = api_task.workflow_context.model.task.model_cls - task = task_model( + self._workflow_context = api_task._workflow_context + task_model = api_task._workflow_context.model.task.model_cls + operation_task = task_model( + id=api_task.id, name=api_task.name, - operation_details=api_task.operation_details, - node_instance=api_task.node_instance, + operation_mapping=api_task.operation_mapping, + actor=api_task.actor, inputs=api_task.inputs, status=task_model.PENDING, - execution_id=self.workflow_context.execution_id, + execution_id=self._workflow_context._execution_id, max_attempts=api_task.max_attempts, retry_interval=api_task.retry_interval, ignore_failure=api_task.ignore_failure ) - self.workflow_context.model.task.store(task) - self._task_id = task.id + + if isinstance(api_task.actor, models.NodeInstance): + context_class = operation_context.NodeOperationContext + elif isinstance(api_task.actor, models.RelationshipInstance): + context_class = operation_context.RelationshipOperationContext + else: + raise RuntimeError('No operation context could be created for {0}' + .format(api_task.actor.model_cls)) + + self._ctx = context_class(name=api_task.name, + workflow_context=self._workflow_context, + task=operation_task) + self._workflow_context.model.task.store(operation_task) + self._task_id = operation_task.id self._update_fields = None @contextmanager - def update(self): + def _update(self): """ A context manager which puts the task into update mode, enabling fields update. :yields: None @@ -113,31 +143,32 @@ class OperationTask(BaseTask, logger.LoggerMixin): self._update_fields = {} try: yield - task = self.context + task = self.model_task for key, value in self._update_fields.items(): setattr(task, key, value) - self.context = task + self.model_task = task finally: self._update_fields = None @property - def workflow_context(self): + def model_task(self): """ - :return: the task's name + Returns the task model in storage + :return: task in storage """ - return self._workflow_ctx + return self._workflow_context.model.task.get(self._task_id) + + @model_task.setter + def model_task(self, value): + self._workflow_context.model.task.store(value) @property def context(self): """ - Returns the task model in storage - :return: task in storage + Contexts for the operation + :return: """ - return self.workflow_context.model.task.get(self._task_id) - - @context.setter - def context(self, value): - self.workflow_context.model.task.store(value) + return self._ctx @property def status(self): @@ -145,11 +176,12 @@ class OperationTask(BaseTask, logger.LoggerMixin): Returns the task status :return: task status """ - return self.context.status + return self.model_task.status @status.setter + @_locked def status(self, value): - self._update_property('status', value) + self._update_fields['status'] = value @property def started_at(self): @@ -157,11 +189,12 @@ class OperationTask(BaseTask, logger.LoggerMixin): Returns when the task started :return: when task started """ - return self.context.started_at + return self.model_task.started_at @started_at.setter + @_locked def started_at(self, value): - self._update_property('started_at', value) + self._update_fields['started_at'] = value @property def ended_at(self): @@ -169,11 +202,12 @@ class OperationTask(BaseTask, logger.LoggerMixin): Returns when the task ended :return: when task ended """ - return self.context.ended_at + return self.model_task.ended_at @ended_at.setter + @_locked def ended_at(self, value): - self._update_property('ended_at', value) + self._update_fields['ended_at'] = value @property def retry_count(self): @@ -181,11 +215,12 @@ class OperationTask(BaseTask, logger.LoggerMixin): Returns the retry count for the task :return: retry count """ - return self.context.retry_count + return self.model_task.retry_count @retry_count.setter + @_locked def retry_count(self, value): - self._update_property('retry_count', value) + self._update_fields['retry_count'] = value @property def due_at(self): @@ -193,19 +228,15 @@ class OperationTask(BaseTask, logger.LoggerMixin): Returns the minimum datetime in which the task can be executed :return: eta """ - return self.context.due_at + return self.model_task.due_at @due_at.setter + @_locked def due_at(self, value): - self._update_property('due_at', value) + self._update_fields['due_at'] = value def __getattr__(self, attr): try: - return getattr(self.context, attr) + return getattr(self.model_task, attr) except AttributeError: return super(OperationTask, self).__getattribute__(attr) - - def _update_property(self, key, value): - if self._update_fields is None: - raise exceptions.TaskException("Task is not in update mode") - self._update_fields[key] = value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/__init__.py b/aria/workflows/executor/__init__.py index ae1e83e..09fb12c 100644 --- a/aria/workflows/executor/__init__.py +++ b/aria/workflows/executor/__init__.py @@ -12,3 +12,10 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +""" +Executors for task execution +""" + + +from . import blocking, celery, multiprocess, thread http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/executor/blocking.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/blocking.py b/aria/workflows/executor/blocking.py index f072d8a..30bebbe 100644 --- a/aria/workflows/executor/blocking.py +++ b/aria/workflows/executor/blocking.py @@ -29,8 +29,8 @@ class CurrentThreadBlockingExecutor(BaseExecutor): def execute(self, task): self._task_started(task) try: - task_func = module.load_attribute(task.operation_details['operation']) - task_func(**task.inputs) + task_func = module.load_attribute(task.operation_mapping) + task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/celery.py b/aria/workflows/executor/celery.py index a82a6b7..baa97bd 100644 --- a/aria/workflows/executor/celery.py +++ b/aria/workflows/executor/celery.py @@ -44,9 +44,11 @@ class CeleryExecutor(BaseExecutor): def execute(self, task): self._tasks[task.id] = task + inputs = task.inputs.copy() + inputs['ctx'] = task.context self._results[task.id] = self._app.send_task( - task.operation_details['operation'], - kwargs=task.inputs, + task.operation_mapping, + kwargs=inputs, task_id=task.id, queue=self._get_queue(task)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/executor/multiprocess.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/multiprocess.py b/aria/workflows/executor/multiprocess.py index 4af08c0..545fbf6 100644 --- a/aria/workflows/executor/multiprocess.py +++ b/aria/workflows/executor/multiprocess.py @@ -46,8 +46,9 @@ class MultiprocessExecutor(BaseExecutor): self._tasks[task.id] = task self._pool.apply_async(_multiprocess_handler, args=( self._queue, + task.context, task.id, - task.operation_details, + task.operation_mapping, task.inputs)) def close(self): @@ -86,11 +87,11 @@ class _MultiprocessMessage(object): self.exception = exception -def _multiprocess_handler(queue, task_id, operation_details, operation_inputs): +def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs): queue.put(_MultiprocessMessage(type='task_started', task_id=task_id)) try: - task_func = module.load_attribute(operation_details['operation']) - task_func(**operation_inputs) + task_func = module.load_attribute(operation_mapping) + task_func(ctx=ctx, **operation_inputs) queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id)) except BaseException as e: queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/aria/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/workflows/executor/thread.py b/aria/workflows/executor/thread.py index 180c482..6d29c1a 100644 --- a/aria/workflows/executor/thread.py +++ b/aria/workflows/executor/thread.py @@ -55,8 +55,8 @@ class ThreadExecutor(BaseExecutor): task = self._queue.get(timeout=1) self._task_started(task) try: - task_func = module.load_attribute(task.operation_details['operation']) - task_func(**task.inputs) + task_func = module.load_attribute(task.operation_mapping) + task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/.pylintrc ---------------------------------------------------------------------- diff --git a/tests/.pylintrc b/tests/.pylintrc index c455d8a..5556304 100644 --- a/tests/.pylintrc +++ b/tests/.pylintrc @@ -77,7 +77,7 @@ confidence= # --enable=similarities". If you want to run only the classes checker, but have # no Warning level messages displayed, use"--disable=all --enable=classes # --disable=W" -disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init,redefined-outer-name,import-error,too-many-locals +disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,no-self-use,missing-docstring,attribute-defined-outside-init,redefined-outer-name,import-error,too-many-locals, protected-ac cess [REPORTS] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/context/__init__.py b/tests/context/__init__.py index ae1e83e..afd5cd7 100644 --- a/tests/context/__init__.py +++ b/tests/context/__init__.py @@ -12,3 +12,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +import sys + +import pytest + +from aria.workflows.core import engine + + +def op_path(func, module_path=None): + module_path = module_path or sys.modules[__name__].__name__ + return '{0}.{1}'.format(module_path, func.__name__) + + +def op_name(actor, operation_name): + return '{name}.{actor.id}'.format(name=operation_name, actor=actor) + + +def execute(workflow_func, workflow_context, executor): + graph = workflow_func(ctx=workflow_context) + eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph) + eng.execute() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/context/test_operation.py b/tests/context/test_operation.py new file mode 100644 index 0000000..0c34a0f --- /dev/null +++ b/tests/context/test_operation.py @@ -0,0 +1,156 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria import ( + workflow, + operation, + context, +) +from aria.workflows import api +from aria.workflows.executor import thread + +from .. import mock +from . import ( + op_path, + op_name, + execute, +) + +global_test_holder = {} + + +@pytest.fixture +def ctx(): + return mock.context.simple() + + +@pytest.fixture +def executor(): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + +def test_node_operation_task_execution(ctx, executor): + operation_name = 'aria.interfaces.lifecycle.create' + + node = mock.models.get_dependency_node() + node.operations[operation_name] = { + 'operation': op_path(my_operation, module_path=__name__) + + } + node_instance = mock.models.get_dependency_node_instance(node) + ctx.model.node.store(node) + ctx.model.node_instance.store(node_instance) + + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask.node_instance( + name=operation_name, + instance=node_instance, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + + operation_context = global_test_holder[op_name(node_instance, operation_name)] + + assert isinstance(operation_context, context.operation.NodeOperationContext) + + # Task bases assertions + assert operation_context.task.actor == node_instance + assert operation_context.task.name == op_name(node_instance, operation_name) + assert operation_context.task.operation_mapping == node.operations[operation_name]['operation'] + assert operation_context.task.inputs == inputs + + # Context based attributes (sugaring) + assert operation_context.node == node_instance.node + assert operation_context.node_instance == node_instance + + +def test_relationship_operation_task_execution(ctx, executor): + operation_name = 'aria.interfaces.relationship_lifecycle.postconfigure' + + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = mock.models.get_dependency_node_instance() + relationship = mock.models.get_relationship(target=dependency_node) + relationship.source_operations[operation_name] = { + 'operation': op_path(my_operation, module_path=__name__) + } + relationship_instance = mock.models.get_relationship_instance( + target_instance=dependency_node_instance, + relationship=relationship) + dependent_node = mock.models.get_dependent_node() + dependent_node_instance = mock.models.get_dependent_node_instance( + relationship_instance=relationship_instance, + dependent_node=dependency_node) + ctx.model.node.store(dependency_node) + ctx.model.node_instance.store(dependency_node_instance) + ctx.model.relationship.store(relationship) + ctx.model.relationship_instance.store(relationship_instance) + ctx.model.node.store(dependent_node) + ctx.model.node_instance.store(dependent_node_instance) + + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask.relationship_instance( + instance=relationship_instance, + name=operation_name, + operation_end=api.task.OperationTask.SOURCE_OPERATION, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + + operation_context = global_test_holder[op_name(relationship_instance, operation_name)] + + assert isinstance(operation_context, context.operation.RelationshipOperationContext) + + # Task bases assertions + assert operation_context.task.actor == relationship_instance + assert operation_context.task.name == op_name(relationship_instance, operation_name) + assert operation_context.task.operation_mapping == \ + relationship.source_operations[operation_name]['operation'] + assert operation_context.task.inputs == inputs + + # Context based attributes (sugaring) + assert operation_context.target_node == dependency_node + assert operation_context.target_node_instance == dependency_node_instance + assert operation_context.relationship == relationship + assert operation_context.relationship_instance == relationship_instance + assert operation_context.source_node == dependent_node + assert operation_context.source_node_instance == dependent_node_instance + + +@operation +def my_operation(ctx, **_): + global_test_holder[ctx.name] = ctx + + +@pytest.fixture(autouse=True) +def cleanup(): + global_test_holder.clear() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/context/test_toolbelt.py ---------------------------------------------------------------------- diff --git a/tests/context/test_toolbelt.py b/tests/context/test_toolbelt.py new file mode 100644 index 0000000..4288ce9 --- /dev/null +++ b/tests/context/test_toolbelt.py @@ -0,0 +1,171 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import pytest + +from aria import workflow, operation, context +from aria.workflows import api +from aria.workflows.executor import thread +from aria.context.toolbelt import RelationshipToolBelt + +from .. import mock +from . import ( + op_path, + op_name, + execute, +) + +global_test_holder = {} + + +@pytest.fixture +def workflow_context(): + return mock.context.simple() + + +@pytest.fixture +def executor(): + result = thread.ThreadExecutor() + try: + yield result + finally: + result.close() + + +def _create_simple_model_in_storage(workflow_context): + dependency_node = mock.models.get_dependency_node() + dependency_node_instance = mock.models.get_dependency_node_instance( + dependency_node=dependency_node) + relationship = mock.models.get_relationship(target=dependency_node) + relationship_instance = mock.models.get_relationship_instance( + target_instance=dependency_node_instance, relationship=relationship) + dependent_node = mock.models.get_dependent_node() + dependent_node_instance = mock.models.get_dependent_node_instance( + relationship_instance=relationship_instance, dependent_node=dependency_node) + workflow_context.model.node.store(dependency_node) + workflow_context.model.node_instance.store(dependency_node_instance) + workflow_context.model.relationship.store(relationship) + workflow_context.model.relationship_instance.store(relationship_instance) + workflow_context.model.node.store(dependent_node) + workflow_context.model.node_instance.store(dependent_node_instance) + return dependency_node, dependency_node_instance, \ + dependent_node, dependent_node_instance, \ + relationship, relationship_instance + + +def test_host_ip(workflow_context, executor): + operation_name = 'aria.interfaces.lifecycle.create' + dependency_node, dependency_node_instance, _, _, _, _ = \ + _create_simple_model_in_storage(workflow_context) + dependency_node.operations[operation_name] = { + 'operation': op_path(host_ip, module_path=__name__) + + } + workflow_context.model.node.store(dependency_node) + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask.node_instance( + instance=dependency_node_instance, + name=operation_name, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + assert global_test_holder.get('host_ip') == \ + dependency_node_instance.runtime_properties.get('ip') + + +def test_dependent_node_instances(workflow_context, executor): + operation_name = 'aria.interfaces.lifecycle.create' + dependency_node, dependency_node_instance, _, dependent_node_instance, _, _ = \ + _create_simple_model_in_storage(workflow_context) + dependency_node.operations[operation_name] = { + 'operation': op_path(dependent_nodes, module_path=__name__) + + } + workflow_context.model.node.store(dependency_node) + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask.node_instance( + instance=dependency_node_instance, + name=operation_name, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + assert list(global_test_holder.get('dependent_node_instances', [])) == \ + list([dependent_node_instance]) + + +def test_relationship_tool_belt(workflow_context, executor): + operation_name = 'aria.interfaces.relationship_lifecycle.postconfigure' + _, _, _, _, relationship, relationship_instance = \ + _create_simple_model_in_storage(workflow_context) + relationship.source_operations[operation_name] = { + 'operation': op_path(relationship_operation, module_path=__name__) + } + workflow_context.model.relationship.store(relationship) + + inputs = {'putput': True} + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask.relationship_instance( + instance=relationship_instance, + name=operation_name, + operation_end=api.task.OperationTask.SOURCE_OPERATION, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor) + + assert isinstance(global_test_holder.get(op_name(relationship_instance, operation_name)), + RelationshipToolBelt) + + +def test_wrong_model_toolbelt(): + with pytest.raises(RuntimeError): + context.toolbelt(None) + +@operation(toolbelt=True) +def host_ip(toolbelt, **_): + global_test_holder['host_ip'] = toolbelt.host_ip + + +@operation(toolbelt=True) +def dependent_nodes(toolbelt, **_): + global_test_holder['dependent_node_instances'] = list(toolbelt.dependent_node_instances) + + +@operation(toolbelt=True) +def relationship_operation(ctx, toolbelt, **_): + global_test_holder[ctx.name] = toolbelt + + +@pytest.fixture(autouse=True) +def cleanup(): + global_test_holder.clear() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/mock/context.py ---------------------------------------------------------------------- diff --git a/tests/mock/context.py b/tests/mock/context.py index 15dbc33..c48ed9a 100644 --- a/tests/mock/context.py +++ b/tests/mock/context.py @@ -22,6 +22,7 @@ from ..storage import InMemoryModelDriver def simple(**kwargs): storage = application_model_storage(InMemoryModelDriver()) storage.setup() + storage.blueprint.store(models.get_blueprint()) storage.deployment.store(models.get_deployment()) final_kwargs = dict( name='simple_context', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/mock/models.py ---------------------------------------------------------------------- diff --git a/tests/mock/models.py b/tests/mock/models.py index 295149e..327b0b9 100644 --- a/tests/mock/models.py +++ b/tests/mock/models.py @@ -26,11 +26,16 @@ EXECUTION_ID = 'test_execution_id' TASK_RETRY_INTERVAL = 1 TASK_MAX_ATTEMPTS = 1 +DEPENDENCY_NODE_ID = 'dependency_node' +DEPENDENCY_NODE_INSTANCE_ID = 'dependency_node_instance' +DEPENDENT_NODE_ID = 'dependent_node' +DEPENDENT_NODE_INSTANCE_ID = 'dependent_node_instance' + def get_dependency_node(): return models.Node( - id='dependency_node', - host_id='dependency_node', + id=DEPENDENCY_NODE_ID, + host_id=DEPENDENCY_NODE_ID, blueprint_id=BLUEPRINT_ID, type='test_node_type', type_hierarchy=[], @@ -47,19 +52,20 @@ def get_dependency_node(): def get_dependency_node_instance(dependency_node=None): return models.NodeInstance( - id='dependency_node_instance', - host_id='dependency_node_instance', + id=DEPENDENCY_NODE_INSTANCE_ID, + host_id=DEPENDENCY_NODE_INSTANCE_ID, deployment_id=DEPLOYMENT_ID, - runtime_properties={}, + runtime_properties={'ip': '1.1.1.1'}, version=None, relationship_instances=[], node=dependency_node or get_dependency_node() ) -def get_relationship(target=None): +def get_relationship(source=None, target=None): return models.Relationship( - target_id=target.id or get_dependency_node().id, + source_id=source.id if source is not None else DEPENDENT_NODE_ID, + target_id=target.id if target is not None else DEPENDENCY_NODE_ID, source_interfaces={}, source_operations=dict((key, {}) for key in operations.RELATIONSHIP_OPERATIONS), target_interfaces={}, @@ -70,10 +76,12 @@ def get_relationship(target=None): ) -def get_relationship_instance(target_instance=None, relationship=None): +def get_relationship_instance(source_instance=None, target_instance=None, relationship=None): return models.RelationshipInstance( - target_id=target_instance.id or get_dependency_node_instance().id, + target_id=target_instance.id if target_instance else DEPENDENCY_NODE_INSTANCE_ID, target_name='test_target_name', + source_id=source_instance.id if source_instance else DEPENDENT_NODE_INSTANCE_ID, + source_name='test_source_name', type='some_type', relationship=relationship or get_relationship(target_instance.node if target_instance else None) @@ -82,8 +90,8 @@ def get_relationship_instance(target_instance=None, relationship=None): def get_dependent_node(relationship=None): return models.Node( - id='dependent_node', - host_id='dependent_node', + id=DEPENDENT_NODE_ID, + host_id=DEPENDENT_NODE_ID, blueprint_id=BLUEPRINT_ID, type='test_node_type', type_hierarchy=[], @@ -98,10 +106,10 @@ def get_dependent_node(relationship=None): ) -def get_dependent_node_instance(relationship_instance, dependent_node=None): +def get_dependent_node_instance(relationship_instance=None, dependent_node=None): return models.NodeInstance( - id='dependent_node_instance', - host_id='dependent_node_instance', + id=DEPENDENT_NODE_INSTANCE_ID, + host_id=DEPENDENT_NODE_INSTANCE_ID, deployment_id=DEPLOYMENT_ID, runtime_properties={}, version=None, @@ -110,6 +118,18 @@ def get_dependent_node_instance(relationship_instance, dependent_node=None): ) +def get_blueprint(): + now = datetime.now() + return models.Blueprint( + plan={}, + id=BLUEPRINT_ID, + description=None, + created_at=now, + updated_at=now, + main_file_name='main_file_name' + ) + + def get_execution(): return models.Execution( id=EXECUTION_ID, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/storage/test_models.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py index 6a879fe..f2fce90 100644 --- a/tests/storage/test_models.py +++ b/tests/storage/test_models.py @@ -195,6 +195,7 @@ def _relationship(id=''): return Relationship( id='rel{0}'.format(id), target_id='target{0}'.format(id), + source_id='source{0}'.format(id), source_interfaces={}, source_operations={}, target_interfaces={}, @@ -248,6 +249,8 @@ def test_relationship_instance(): relationship_instances = [RelationshipInstance( id='rel{0}'.format(index), target_id='target_{0}'.format(index % 2), + source_id='source_{0}'.format(index % 2), + source_name='', target_name='', relationship=relationship, type='type{0}'.format(index)) for index in xrange(3)] @@ -348,9 +351,9 @@ def test_task_max_attempts_validation(): def create_task(max_attempts): Task(execution_id='eid', name='name', - operation_details={}, + operation_mapping='', inputs={}, - node_instance=models.get_dependency_node_instance(), + actor=models.get_dependency_node_instance(), max_attempts=max_attempts) create_task(max_attempts=1) create_task(max_attempts=2) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5956f889/tests/storage/test_resource_storage.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_resource_storage.py b/tests/storage/test_resource_storage.py index c9ecd69..9673a26 100644 --- a/tests/storage/test_resource_storage.py +++ b/tests/storage/test_resource_storage.py @@ -89,6 +89,18 @@ class TestResourceStorage(TestFileSystem): with open(os.path.join(self.path, os.path.join(temp_dir, tmpfile_name))) as f: assert f.read() == 'fake context' + def test_download_non_existing_file(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + with pytest.raises(StorageError): + storage.blueprint.download(entry_id='blueprint_id', destination='', path='fake_path') + + def test_data_non_existing_file(self): + storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) + self._create(storage) + with pytest.raises(StorageError): + storage.blueprint.data(entry_id='blueprint_id', path='fake_path') + def test_data_file(self): storage = ResourceStorage(FileSystemResourceDriver(directory=self.path)) self._create(storage)