ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [1/3] incubator-ariatosca git commit: wip2
Date Sun, 13 Nov 2016 16:30:19 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-9-API-for-operation-context b71174f26 -> c3c0aa9b0


wip2


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/84d48402
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/84d48402
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/84d48402

Branch: refs/heads/ARIA-9-API-for-operation-context
Commit: 84d48402ab11601c99c0fa1bce63a5576c2df1d6
Parents: b71174f
Author: mxmrlv <mxmrlv@gmail.com>
Authored: Thu Nov 10 19:10:17 2016 +0200
Committer: mxmrlv <mxmrlv@gmail.com>
Committed: Thu Nov 10 19:10:17 2016 +0200

----------------------------------------------------------------------
 aria/context/__init__.py                        | 107 +---------------
 aria/context/common.py                          | 121 +++++++++++++++++++
 aria/context/operation.py                       |   7 +-
 aria/context/toolbelt.py                        |  56 +++++++++
 aria/context/workflow.py                        |   2 +-
 aria/decorators.py                              |  11 +-
 aria/storage/models.py                          |  11 +-
 aria/workflows/core/task.py                     |  39 ++++--
 aria/workflows/executor/multiprocess.py         |   2 +-
 aria/workflows/executor/thread.py               |   2 +-
 tests/context/__init__.py                       |  56 +++++++++
 tests/context/operation.py                      | 121 +++++++++++++++++++
 tests/context/toolbelt.py                       |  69 +++++++++++
 tests/mock/context.py                           |   1 +
 tests/mock/models.py                            |   2 +-
 tests/test_context.py                           |  56 ---------
 tests/workflows/core/test_executor.py           |   6 +-
 tests/workflows/core/test_task.py               | 105 ++++++++--------
 .../test_task_graph_into_exececution_graph.py   |   2 +-
 tests/workflows/test_engine.py                  |  24 ++--
 20 files changed, 547 insertions(+), 253 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/__init__.py
----------------------------------------------------------------------
diff --git a/aria/context/__init__.py b/aria/context/__init__.py
index aa68a36..fa5fbda 100644
--- a/aria/context/__init__.py
+++ b/aria/context/__init__.py
@@ -16,109 +16,6 @@
 """
 Provides contexts to workflow and operation
 """
-from uuid import uuid4
 
-from .. import logger
-from ..tools.lru_cache import lru_cache
-
-
-class BaseContext(logger.LoggerMixin):
-    """
-    Context object used during workflow creation and execution
-    """
-
-    def __init__(
-            self,
-            name,
-            model_storage,
-            resource_storage,
-            deployment_id,
-            workflow_id,
-            execution_id=None,
-            parameters=None,
-            **kwargs):
-        super(BaseContext, self).__init__(**kwargs)
-        self.name = name
-        self.id = str(uuid4())
-        self.model = model_storage
-        self.resource = resource_storage
-        self.deployment_id = deployment_id
-        self.workflow_id = workflow_id
-        self.execution_id = execution_id or str(uuid4())
-        self.parameters = parameters or {}
-
-    def __repr__(self):
-        return (
-            '{name}(deployment_id={self.deployment_id}, '
-            'workflow_id={self.workflow_id}, '
-            'execution_id={self.execution_id})'.format(
-                name=self.__class__.__name__, self=self))
-
-    @property
-    def blueprint_id(self):
-        """
-        The blueprint id
-        """
-        return self.deployment.blueprint_id
-
-    @property
-    @lru_cache()
-    def blueprint(self):
-        """
-        The blueprint model
-        """
-        return self.model.blueprint.get(self.blueprint_id)
-
-    @property
-    @lru_cache()
-    def deployment(self):
-        """
-        The deployment model
-        """
-        return self.model.deployment.get(self.deployment_id)
-
-    @property
-    def execution(self):
-        """
-        The execution model
-        """
-        return self.model.execution.get(self.execution_id)
-
-    @execution.setter
-    def execution(self, value):
-        """
-        Store the execution in the model storage
-        """
-        self.model.execution.store(value)
-
-    def download_blueprint_resource(self, destination, path=None):
-        """
-        Download a blueprint resource from the resource storage
-        """
-        return self.resource.blueprint.download(
-            entry_id=self.blueprint_id,
-            destination=destination,
-            path=path)
-
-    def download_deployment_resource(self, destination, path=None):
-        """
-        Download a deployment resource from the resource storage
-        """
-        return self.resource.deployment.download(
-            entry_id=self.deployment_id,
-            destination=destination,
-            path=path)
-
-    @lru_cache()
-    def get_deployment_resource_data(self, path=None):
-        """
-        Read a deployment resource as string from the resource storage
-        """
-        return self.resource.deployment.data(entry_id=self.deployment_id, path=path)
-
-    @lru_cache()
-    def get_blueprint_resource_data(self, path=None):
-        """
-        Read a blueprint resource as string from the resource storage
-        """
-        return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path)
+from . import workflow, operation
+from .toolbelt import tool_belt

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/common.py
----------------------------------------------------------------------
diff --git a/aria/context/common.py b/aria/context/common.py
new file mode 100644
index 0000000..9f69b46
--- /dev/null
+++ b/aria/context/common.py
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from uuid import uuid4
+
+from .. import logger
+from ..tools.lru_cache import lru_cache
+
+
+class BaseContext(logger.LoggerMixin):
+    """
+    Context object used during workflow creation and execution
+    """
+
+    def __init__(
+            self,
+            name,
+            model_storage,
+            resource_storage,
+            deployment_id,
+            workflow_id,
+            execution_id=None,
+            parameters=None,
+            **kwargs):
+        super(BaseContext, self).__init__(**kwargs)
+        self.name = name
+        self.id = str(uuid4())
+        self.model = model_storage
+        self.resource = resource_storage
+        self.deployment_id = deployment_id
+        self.workflow_id = workflow_id
+        self.execution_id = execution_id or str(uuid4())
+        self.parameters = parameters or {}
+
+    def __repr__(self):
+        return (
+            '{name}(deployment_id={self.deployment_id}, '
+            'workflow_id={self.workflow_id}, '
+            'execution_id={self.execution_id})'.format(
+                name=self.__class__.__name__, self=self))
+
+    @property
+    def blueprint_id(self):
+        """
+        The blueprint id
+        """
+        return self.deployment.blueprint_id
+
+    @property
+    @lru_cache()
+    def blueprint(self):
+        """
+        The blueprint model
+        """
+        return self.model.blueprint.get(self.blueprint_id)
+
+    @property
+    @lru_cache()
+    def deployment(self):
+        """
+        The deployment model
+        """
+        return self.model.deployment.get(self.deployment_id)
+
+    @property
+    def execution(self):
+        """
+        The execution model
+        """
+        return self.model.execution.get(self.execution_id)
+
+    @execution.setter
+    def execution(self, value):
+        """
+        Store the execution in the model storage
+        """
+        self.model.execution.store(value)
+
+    def download_blueprint_resource(self, destination, path=None):
+        """
+        Download a blueprint resource from the resource storage
+        """
+        return self.resource.blueprint.download(
+            entry_id=self.blueprint_id,
+            destination=destination,
+            path=path)
+
+    def download_deployment_resource(self, destination, path=None):
+        """
+        Download a deployment resource from the resource storage
+        """
+        return self.resource.deployment.download(
+            entry_id=self.deployment_id,
+            destination=destination,
+            path=path)
+
+    @lru_cache()
+    def get_deployment_resource_data(self, path=None):
+        """
+        Read a deployment resource as string from the resource storage
+        """
+        return self.resource.deployment.data(entry_id=self.deployment_id, path=path)
+
+    @lru_cache()
+    def get_blueprint_resource_data(self, path=None):
+        """
+        Read a blueprint resource as string from the resource storage
+        """
+        return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/context/operation.py b/aria/context/operation.py
index 13764d8..908287d 100644
--- a/aria/context/operation.py
+++ b/aria/context/operation.py
@@ -18,7 +18,7 @@ Workflow and operation contexts
 """
 
 
-from . import BaseContext
+from .common import BaseContext
 
 
 class BaseOperationContext(BaseContext):
@@ -36,6 +36,7 @@ class BaseOperationContext(BaseContext):
             execution_id=workflow_context.execution_id,
             parameters=workflow_context.parameters,
             **kwargs)
+        self._workflow_context = workflow_context
         self._task_model = task_context
 
     def __repr__(self):
@@ -45,6 +46,10 @@ class BaseOperationContext(BaseContext):
         return '{name}({0})'.format(details, name=self.name)
 
     @property
+    def workflow(self):
+        return self._workflow_context
+
+    @property
     def task(self):
         return self._task_model
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/toolbelt.py
----------------------------------------------------------------------
diff --git a/aria/context/toolbelt.py b/aria/context/toolbelt.py
new file mode 100644
index 0000000..59e4d3c
--- /dev/null
+++ b/aria/context/toolbelt.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from contextlib import contextmanager
+
+from . import operation
+
+
+class _ToolBelt(object):
+
+    def __init__(self):
+        self._op_context = None
+        self._workflow_context = None
+
+    @property
+    def node_instances_connected_to_me(self):
+        assert isinstance(self._op_context, operation.NodeOperationContext)
+        for node_instance in self._workflow_context.node_instances:
+            for relationship_instance in node_instance.relationship_instances:
+                if relationship_instance.target_id == self._op_context.node_instance.id:
+                    yield node_instance
+
+    @property
+    def relationships_to_me(self):
+        assert isinstance(self._op_context, operation.NodeOperationContext)
+        for node_instance in self._workflow_context.node_instances:
+            for relationship_instance in node_instance.relationship_instances:
+                if relationship_instance.target_id == self._op_context.node_instance.id:
+                    yield relationship_instance
+
+    @contextmanager
+    def use(self, operation_context):
+        assert isinstance(operation_context, operation.BaseOperationContext)
+        self._op_context = operation_context
+        self._workflow_context = operation_context.workflow
+        try:
+            yield self
+        finally:
+            self._op_context = None
+            self._workflow_context = None
+
+_toolbelt = _ToolBelt()
+
+
+tool_belt = _toolbelt.use

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/context/workflow.py b/aria/context/workflow.py
index ef2feb6..0b5b2a8 100644
--- a/aria/context/workflow.py
+++ b/aria/context/workflow.py
@@ -22,7 +22,7 @@ from contextlib import contextmanager
 
 from aria import exceptions
 
-from . import BaseContext
+from .common import BaseContext
 
 
 class ContextException(exceptions.AriaError):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/decorators.py
----------------------------------------------------------------------
diff --git a/aria/decorators.py b/aria/decorators.py
index e41ed81..d826c2d 100644
--- a/aria/decorators.py
+++ b/aria/decorators.py
@@ -50,17 +50,20 @@ def workflow(func=None, suffix_template=''):
     return _wrapper
 
 
-def operation(func=None, suffix_template=''):
+def operation(func=None, tool_belt=False, suffix_template=''):
     """
     Operation decorator
     """
     if func is None:
-        return partial(operation, suffix_template=suffix_template)
+        return partial(operation, suffix_template=suffix_template, tool_belt=tool_belt)
 
     @wraps(func)
     def _wrapper(**func_kwargs):
-        validate_function_arguments(func, func_kwargs)
-        return func(**func_kwargs)
+        with context.tool_belt(func_kwargs.get('ctx')) as tb:
+            if tool_belt:
+                func_kwargs.setdefault('tool_belt', tb)
+            validate_function_arguments(func, func_kwargs)
+            return func(**func_kwargs)
     return _wrapper
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index 686ca42..3b36818 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -263,7 +263,11 @@ class Node(Model):
         # todo: maybe add here Exception if isn't exists (didn't yield one's)
 
 
-class RelationshipInstance(Model):
+class Instance(Model):
+    id = Field(type=basestring, default=uuid_generator)
+
+
+class RelationshipInstance(Instance):
     """
     A Model which represents a relationship instance
     """
@@ -274,7 +278,7 @@ class RelationshipInstance(Model):
     relationship = PointerField(type=Relationship)
 
 
-class NodeInstance(Model):
+class NodeInstance(Instance):
     """
     A Model which represents a node instance
     """
@@ -401,6 +405,5 @@ class Task(Model):
     # Operation specific fields
     name = Field(type=basestring)
     operation_details = Field(type=dict)
-    # TODO: this might need to be remodeled
-    operation_container = PointerField(type=NodeInstance)
+    operation_container = Field()
     inputs = Field(type=dict, default=lambda: {})

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py
index 4fb3766..075ef8a 100644
--- a/aria/workflows/core/task.py
+++ b/aria/workflows/core/task.py
@@ -18,6 +18,8 @@ Workflow tasks
 """
 from contextlib import contextmanager
 from datetime import datetime
+from functools import partial
+from functools import wraps
 
 from ... import logger
 from ...storage import models
@@ -25,6 +27,18 @@ from ...context import operation as operation_context
 from .. import exceptions
 
 
+def _locked(func=None):
+    if func is None:
+        return partial(_locked, func=_locked)
+
+    @wraps(func)
+    def _wrapper(self, value, **kwargs):
+        if self._update_fields is None:
+            raise exceptions.TaskException("Task is not in update mode")
+        return func(self, value, **kwargs)
+    return _wrapper
+
+
 class BaseTask(logger.LoggerMixin):
     """
     Base class for Task objects
@@ -97,7 +111,7 @@ class OperationTask(BaseTask):
             operation_container=api_task.operation_container,
             inputs=api_task.inputs,
             status=task_model.PENDING,
-            execution_id=self.workflow_context.execution.id,
+            execution_id=self.workflow_context.execution_id,
             max_retries=self.workflow_context.parameters.get('max_retries', 1),
         )
 
@@ -164,9 +178,8 @@ class OperationTask(BaseTask):
         return self.model_context.status
 
     @status.setter
+    @_locked
     def status(self, value):
-        if self._update_fields is None:
-            raise exceptions.TaskException("Task is not in update mode")
         self._update_fields['status'] = value
 
     @property
@@ -178,9 +191,8 @@ class OperationTask(BaseTask):
         return self.model_context.started_at
 
     @started_at.setter
+    @_locked
     def started_at(self, value):
-        if self._update_fields is None:
-            raise exceptions.TaskException("Task is not in update mode")
         self._update_fields['started_at'] = value
 
     @property
@@ -192,9 +204,8 @@ class OperationTask(BaseTask):
         return self.model_context.ended_at
 
     @ended_at.setter
+    @_locked
     def ended_at(self, value):
-        if self._update_fields is None:
-            raise exceptions.TaskException("Task is not in update mode")
         self._update_fields['ended_at'] = value
 
     @property
@@ -206,13 +217,23 @@ class OperationTask(BaseTask):
         return self.model_context.retry_count
 
     @retry_count.setter
+    @_locked
     def retry_count(self, value):
-        if self._update_fields is None:
-            raise exceptions.TaskException("Task is not in update mode")
         self._update_fields['retry_count'] = value
 
+    @property
+    def eta(self):
+        return self.model_context.eta
+
+    @eta.setter
+    @_locked
+    def eta(self, value):
+        self._update_fields['eta'] = value
+
     def __getattr__(self, attr):
         try:
             return getattr(self.model_context, attr)
         except AttributeError:
             return super(OperationTask, self).__getattribute__(attr)
+
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/multiprocess.py b/aria/workflows/executor/multiprocess.py
index 063f834..c6423b6 100644
--- a/aria/workflows/executor/multiprocess.py
+++ b/aria/workflows/executor/multiprocess.py
@@ -46,7 +46,7 @@ class MultiprocessExecutor(BaseExecutor):
         self._tasks[task.id] = task
         self._pool.apply_async(_multiprocess_handler, args=(
             self._queue,
-            task.model_context,
+            task.context,
             task.id,
             task.operation_details,
             task.inputs))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/aria/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/thread.py b/aria/workflows/executor/thread.py
index b2dc278..a6b616b 100644
--- a/aria/workflows/executor/thread.py
+++ b/aria/workflows/executor/thread.py
@@ -56,7 +56,7 @@ class ThreadExecutor(BaseExecutor):
                 self._task_started(task)
                 try:
                     task_func = module.load_attribute(task.operation_details['operation'])
-                    task_func(ctx=task.model_context, **task.inputs)
+                    task_func(ctx=task.context, **task.inputs)
                     self._task_succeeded(task)
                 except BaseException as e:
                     self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/context/__init__.py b/tests/context/__init__.py
new file mode 100644
index 0000000..89f55cb
--- /dev/null
+++ b/tests/context/__init__.py
@@ -0,0 +1,56 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+import pytest
+from aria.workflows.executor import blocking
+from aria.workflows.core import engine
+
+from .. import mock
+
+
+global_test_holder = {}
+
+
+@pytest.fixture
+def executor():
+    # result = thread.ThreadExecutor()
+    result = blocking.CurrentThreadBlockingExecutor()
+    try:
+        yield result
+    finally:
+        result.close()
+
+
+@pytest.fixture
+def workflow_context():
+    return mock.context.simple()
+
+
+def op_path(func, module_path=None):
+    module_path = module_path or sys.modules[__name__].__name__
+    return '{0}.{1}'.format(module_path, func.__name__)
+
+
+def execute(workflow_func, workflow_context, executor):
+    graph = workflow_func(ctx=workflow_context)
+    eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph)
+    eng.execute()
+
+
+@pytest.fixture(autouse=True)
+def cleanup():
+    global_test_holder.clear()
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/context/operation.py
----------------------------------------------------------------------
diff --git a/tests/context/operation.py b/tests/context/operation.py
new file mode 100644
index 0000000..7422ce6
--- /dev/null
+++ b/tests/context/operation.py
@@ -0,0 +1,121 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+from aria import workflow, operation
+from aria.workflows import api
+
+from .. import mock
+from . import op_path, execute, executor, workflow_context, global_test_holder
+
+
+def test_node_operation_task_execution(workflow_context, executor):
+    node = mock.models.get_dependency_node()
+    node_instance = mock.models.get_dependency_node_instance(node)
+    workflow_context.model.node.store(node)
+    workflow_context.model.node_instance.store(node_instance)
+
+    node_instance = workflow_context.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+    name = 'op_name'
+    operation_details = {'operation': op_path(foo, module_path=sys.modules[__name__].__name__)}
+    inputs = {'putput': True}
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask(
+                name=name,
+                operation_details=operation_details,
+                operation_container=node_instance,
+                inputs=inputs
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
+
+    operation_value = global_test_holder[name]
+
+    # operation container based attributes
+    for key, value in node_instance.fields_dict.items():
+        assert getattr(operation_value.operation_container, key) == value
+
+    # Task bases assertions
+    assert operation_value.task.operation_container == node_instance
+    assert operation_value.task.name == name
+    assert operation_value.task.operation_details == operation_details
+    assert operation_value.task.inputs == inputs
+
+    # Context based attributes (sugaring)
+    assert operation_value.node == node_instance.node
+    assert operation_value.node_instance == node_instance
+
+
+def test_relationship_operation_task_execution(workflow_context, executor):
+    dependency_node = mock.models.get_dependency_node()
+    dependency_node_instance = mock.models.get_dependency_node_instance()
+    relationship = mock.models.get_relationship(dependency_node)
+    relationship_instance = mock.models.get_relationship_instance(dependency_node_instance,
+                                                                  relationship)
+    dependent_node = mock.models.get_dependent_node()
+    dependent_node_instance = mock.models.get_dependent_node_instance(relationship_instance,
+                                                                      dependency_node)
+    workflow_context.model.node.store(dependency_node)
+    workflow_context.model.node_instance.store(dependency_node_instance)
+    workflow_context.model.relationship.store(relationship)
+    workflow_context.model.relationship_instance.store(relationship_instance)
+    workflow_context.model.node.store(dependent_node)
+    workflow_context.model.node_instance.store(dependent_node_instance)
+
+    name = 'op_name'
+    operation_details = {'operation': op_path(foo, module_path=sys.modules[__name__].__name__)}
+    inputs = {'putput': True}
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask(
+                name=name,
+                operation_details=operation_details,
+                operation_container=relationship_instance,
+                inputs=inputs
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
+
+    operation_value = global_test_holder[name]
+
+    # operation container based attributes
+    for key, value in relationship_instance.fields_dict.items():
+        assert getattr(operation_value.operation_container, key) == value
+
+    # Task bases assertions
+    assert operation_value.task.operation_container == relationship_instance
+    assert operation_value.task.name == name
+    assert operation_value.task.operation_details == operation_details
+    assert operation_value.task.inputs == inputs
+
+    # Context based attributes (sugaring)
+    assert operation_value.target_node == dependency_node
+    assert operation_value.target_node_instance == dependency_node_instance
+    assert operation_value.relationship == relationship
+    assert operation_value.relationship_instance == relationship_instance
+
+
+@operation
+def foo(ctx, **_):
+    global_test_holder[ctx.name] = ctx
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/context/toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/context/toolbelt.py b/tests/context/toolbelt.py
new file mode 100644
index 0000000..b46fcad
--- /dev/null
+++ b/tests/context/toolbelt.py
@@ -0,0 +1,69 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+from aria.workflows import api
+from aria import workflow, operation
+
+from .. import mock
+from . import op_path, execute, executor, workflow_context, global_test_holder
+
+
+def test_operation_tool_belt(workflow_context, executor):
+
+    dependency_node = mock.models.get_dependency_node()
+    dependency_node_instance = mock.models.get_dependency_node_instance()
+    relationship = mock.models.get_relationship(dependency_node)
+    relationship_instance = mock.models.get_relationship_instance(dependency_node_instance,
+                                                                  relationship)
+    dependent_node = mock.models.get_dependent_node()
+    dependent_node_instance = mock.models.get_dependent_node_instance(relationship_instance,
+                                                                      dependency_node)
+    workflow_context.model.node.store(dependency_node)
+    workflow_context.model.node_instance.store(dependency_node_instance)
+    workflow_context.model.relationship.store(relationship)
+    workflow_context.model.relationship_instance.store(relationship_instance)
+    workflow_context.model.node.store(dependent_node)
+    workflow_context.model.node_instance.store(dependent_node_instance)
+
+    node_instance = workflow_context.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+    name = 'op_name'
+    operation_details = {'operation': op_path(foo, module_path=sys.modules[__name__].__name__)}
+    inputs = {'putput': True}
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask(
+                name=name,
+                operation_details=operation_details,
+                operation_container=node_instance,
+                inputs=inputs
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
+
+    assert list(global_test_holder.get('connected_to_me', [])) == list([dependent_node_instance])
+    assert list(global_test_holder.get('relationships_to_me', [])) == list([relationship_instance])
+
+
+@operation(tool_belt=True)
+def foo(ctx, tool_belt, **_):
+    global_test_holder['connected_to_me'] = list(tool_belt.node_instances_connected_to_me)
+    global_test_holder['relationships_to_me'] = list(tool_belt.relationships_to_me)
+    global_test_holder[ctx.name] = tool_belt
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index a89612e..4b1f34b 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -22,6 +22,7 @@ from ..storage import InMemoryModelDriver
 def simple():
     storage = application_model_storage(InMemoryModelDriver())
     storage.setup()
+    storage.deployment.store(models.get_deployment())
     return context.workflow.WorkflowContext(
         name='simple_context',
         model_storage=storage,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index bf40039..66a902d 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -62,7 +62,7 @@ def get_dependency_node_instance(dependency_node=None):
 
 def get_relationship(target=None):
     return models.Relationship(
-        target_id=target.id or get_dependency_node().id,
+        target_id=target.id if target is not None else get_dependency_node().id,
         source_interfaces={},
         source_operations=dict((key, {}) for key in operations.RELATIONSHIP_OPERATIONS),
         target_interfaces={},

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/test_context.py
----------------------------------------------------------------------
diff --git a/tests/test_context.py b/tests/test_context.py
deleted file mode 100644
index b61c5c6..0000000
--- a/tests/test_context.py
+++ /dev/null
@@ -1,56 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import pytest
-
-from aria.context import workflow, operation
-
-from . import mock
-
-
-@pytest.fixture
-def ctx():
-    simple_context = mock.context.simple()
-
-    blueprint = mock.models.get_blueprint()
-    deployment = mock.models.get_deployment()
-    node = mock.models.get_dependency_node()
-    node_instance = mock.models.get_dependency_node_instance(node)
-    execution = mock.models.get_execution()
-
-    simple_context.model.blueprint.store(blueprint)
-    simple_context.model.deployment.store(deployment)
-    simple_context.model.node.store(node)
-    simple_context.model.node_instance.store(node_instance)
-    simple_context.model.execution.store(execution)
-
-    return simple_context
-
-
-class TestOperationContext(object):
-
-    def test_node_operation(self, ctx):
-        node = ctx.model.node.get(mock.models.DEPENDENCY_NODE_ID)
-        node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
-        with workflow.current.push(ctx):
-            operation_context = operation.NodeOperationContext(
-                name='test_operation_name',
-                operation_details={'dits': True},
-                operation_container=node_instance,
-                inputs={'inputs': True},
-            )
-        # operation host attributes
-        assert operation_context.node == node
-        assert operation_context.node_instance == node_instance
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/workflows/core/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_executor.py b/tests/workflows/core/test_executor.py
index 3445347..91213e5 100644
--- a/tests/workflows/core/test_executor.py
+++ b/tests/workflows/core/test_executor.py
@@ -81,15 +81,15 @@ class TestExecutor(object):
             self.executor.close()
 
 
-def mock_successful_task(ctx):
+def mock_successful_task(**_):
     pass
 
 
-def mock_failing_task(ctx):
+def mock_failing_task(**_):
     raise MockException
 
 
-def mock_task_with_input(ctx, input):
+def mock_task_with_input(input, **_):
     raise MockException(input)
 
 if app:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_task.py b/tests/workflows/core/test_task.py
index 7731165..451bce3 100644
--- a/tests/workflows/core/test_task.py
+++ b/tests/workflows/core/test_task.py
@@ -12,24 +12,19 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import pytest
+from datetime import datetime, timedelta
 
-from aria import decorators
 from aria.context import workflow as workflow_context
 from aria.workflows import (
     api,
     core,
+    exceptions,
 )
-from aria.workflows.executor import blocking
-from aria.storage import models
 
 from ... import mock
 
 
-global_test_holder = {}
-
-
 @pytest.fixture
 def ctx():
     simple_context = mock.context.simple()
@@ -49,16 +44,9 @@ def ctx():
     return simple_context
 
 
-@pytest.fixture(autouse=True)
-def clear_global_test_operation_holder():
-    global_test_holder.clear()
-
-
 class TestOperationTask(object):
 
-    def test_operation_task_creation(self, ctx):
-        node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
-
+    def _create_operation_task(self, ctx, node_instance):
         with workflow_context.current.push(ctx):
             api_task = api.task.OperationTask(
                 name='ripe',
@@ -68,50 +56,55 @@ class TestOperationTask(object):
 
             core_task = core.task.OperationTask(api_task=api_task)
 
-            storage_task = ctx.model.task.get(core_task.id)
+        return api_task, core_task
 
-            assert core_task.model_context == storage_task
-            assert core_task.name == api_task.name
-            assert core_task.operation_details == api_task.operation_details
-            assert core_task.operation_container == api_task.operation_container == node_instance
-            assert core_task.inputs == api_task.inputs == storage_task.inputs
-
-    def test_operation_task_execution(self, ctx):
+    def test_operation_task_creation(self, ctx):
         node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+        api_task, core_task = self._create_operation_task(ctx, node_instance)
+        storage_task = ctx.model.task.get(core_task.id)
 
-        operation_name = 'silly_operation'
-        operation_details = {'operation': 'tests.workflows.core.test_task.foo'}
-        inputs = {'putput': True}
-
-        @decorators.workflow
-        def basic_workflow(graph, **_):
-            graph.add_tasks(
-                api.task.OperationTask(
-                    name=operation_name,
-                    operation_details=operation_details,
-                    operation_container=node_instance,
-                    inputs=inputs,
-                )
-            )
-
-        engine = core.engine.Engine(executor=blocking.CurrentThreadBlockingExecutor(),
-                                    workflow_context=ctx,
-                                    tasks_graph=basic_workflow(ctx=ctx))
-        engine.execute()
-        operation_value = global_test_holder[operation_name]
+        assert core_task.model_context == storage_task
+        assert core_task.name == api_task.name
+        assert core_task.operation_details == api_task.operation_details
+        assert core_task.operation_container == api_task.operation_container == node_instance
+        assert core_task.inputs == api_task.inputs == storage_task.inputs
 
-        assert operation_value.node_instance == node_instance
-        assert operation_value.node == node_instance.node
-        # operation_host attributes
-        for key, value in node_instance.fields_dict.items():
-            assert getattr(operation_value.operation_container, key) == value
-
-        assert operation_value.task.operation_container == node_instance
-        assert operation_value.task.name == operation_name
-        assert operation_value.task.operation_details == operation_details
-        assert operation_value.task.inputs == inputs
+    def test_operation_task_edit_locked_attribute(self, ctx):
+        node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
 
-@decorators.operation
-def foo(ctx, **_):
-    global_test_holder[ctx.name] = ctx
+        _, core_task = self._create_operation_task(ctx, node_instance)
+        now = datetime.utcnow()
+        with pytest.raises(exceptions.TaskException):
+            core_task.status = core_task.STARTED
+        with pytest.raises(exceptions.TaskException):
+            core_task.started_at = now
+        with pytest.raises(exceptions.TaskException):
+            core_task.ended_at = now
+        with pytest.raises(exceptions.TaskException):
+            core_task.retry_count = 2
+        with pytest.raises(exceptions.TaskException):
+            core_task.eta = now
+
+    def test_operation_task_edit_attributes(self, ctx):
+        node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
 
+        _, core_task = self._create_operation_task(ctx, node_instance)
+        future_time = datetime.utcnow() + timedelta(seconds=3)
+
+        with core_task.update():
+            core_task.status = core_task.STARTED
+            core_task.started_at = future_time
+            core_task.ended_at = future_time
+            core_task.retry_count = 2
+            core_task.eta = future_time
+            assert core_task.status != core_task.STARTED
+            assert core_task.started_at != future_time
+            assert core_task.ended_at != future_time
+            assert core_task.retry_count != 2
+            assert core_task.eta != future_time
+
+        assert core_task.status == core_task.STARTED
+        assert core_task.started_at == future_time
+        assert core_task.ended_at == future_time
+        assert core_task.retry_count == 2
+        assert core_task.eta == future_time

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/workflows/core/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py
index 992b0cc..170fbdb 100644
--- a/tests/workflows/core/test_task_graph_into_exececution_graph.py
+++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py
@@ -93,7 +93,7 @@ def _assert_execution_is_api_task(execution_task, api_task):
     assert execution_task.id == api_task.id
     assert execution_task.name == api_task.name
     assert execution_task.operation_details == api_task.operation_details
-    assert execution_task.operation_host == api_task.operation_host
+    assert execution_task.operation_container == api_task.operation_container
     assert execution_task.inputs == api_task.inputs
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/84d48402/tests/workflows/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py
index f016d12..72c70e5 100644
--- a/tests/workflows/test_engine.py
+++ b/tests/workflows/test_engine.py
@@ -19,11 +19,10 @@ import pytest
 
 import aria
 from aria import events
-from aria import workflow
+from aria import workflow, operation
 from aria import context
 from aria.storage import models
 from aria.workflows import exceptions
-from aria.workflows.executor import thread
 from aria.workflows.core import engine
 from aria.workflows import api
 
@@ -95,11 +94,11 @@ class TestEngine(object):
         eng.execute()
 
     @staticmethod
-    def _op(func, ctx, inputs=None):
+    def _op(func, ctx, inputs=None, task_name=None):
         return api.task.OperationTask(
-            name='task',
-            operation_details={'operation': 'tests.workflows.test_engine.{name}'.format(
-                name=func.__name__)},
+            name=task_name or 'task',
+            operation_details={'operation': 'tests.workflows.test_engine.{name}'.
+                               format(name=func.__name__)},
             operation_container=ctx.model.node_instance.get('dependency_node_instance'),
             inputs=inputs
         )
@@ -141,7 +140,9 @@ class TestEngine(object):
 
     @pytest.fixture(scope='function')
     def executor(self):
-        result = thread.ThreadExecutor()
+        from aria.workflows.executor import blocking
+        result = blocking.CurrentThreadBlockingExecutor()
+        # result = thread.ThreadExecutor()
         try:
             yield result
         finally:
@@ -174,14 +175,17 @@ class TestEngine(object):
         return result
 
 
-def mock_success_task():
+@operation
+def mock_success_task(**_):
     pass
 
 
-def mock_failed_task():
+@operation
+def mock_failed_task(**_):
     raise RuntimeError
 
 
-def mock_ordered_task(counter):
+@operation
+def mock_ordered_task(counter, **_):
     invocations = global_test_holder.setdefault('invocations', [])
     invocations.append(counter)



Mime
View raw message