ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: ARIATOSCA-9-API-for-operation-context [Forced Update!]
Date Mon, 14 Nov 2016 10:17:50 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-9-API-for-operation-context 7fbf6c669 -> 1cea7ef8b (forced update)


ARIATOSCA-9-API-for-operation-context


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/1cea7ef8
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/1cea7ef8
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/1cea7ef8

Branch: refs/heads/ARIA-9-API-for-operation-context
Commit: 1cea7ef8b9894e9c10c58f94a90713e36966fd15
Parents: eb3dd5d
Author: mxmrlv <mxmrlv@gmail.com>
Authored: Thu Oct 27 20:39:09 2016 +0300
Committer: mxmrlv <mxmrlv@gmail.com>
Committed: Mon Nov 14 12:17:39 2016 +0200

----------------------------------------------------------------------
 aria/context/__init__.py                        |   1 +
 aria/context/common.py                          | 137 ++++++++++++++++
 aria/context/operation.py                       | 121 ++++++++++----
 aria/context/toolbelt.py                        | 102 ++++++++++++
 aria/context/workflow.py                        | 125 ++-------------
 aria/decorators.py                              |  40 ++---
 aria/logger.py                                  |   2 +-
 aria/storage/models.py                          |   5 +-
 aria/storage/structures.py                      |   1 -
 aria/workflows/api/task.py                      |  13 +-
 aria/workflows/builtin/heal.py                  |   8 +-
 aria/workflows/builtin/workflows.py             |  65 +++-----
 aria/workflows/core/__init__.py                 |   2 +-
 aria/workflows/core/task.py                     |  92 +++++++----
 aria/workflows/executor/__init__.py             |   7 +
 aria/workflows/executor/blocking.py             |   2 +-
 aria/workflows/executor/celery.py               |   1 +
 aria/workflows/executor/multiprocess.py         |   5 +-
 aria/workflows/executor/thread.py               |   2 +-
 tests/context/__init__.py                       |  24 +++
 tests/context/test_operation.py                 | 158 +++++++++++++++++++
 tests/context/test_toolbelt.py                  | 142 +++++++++++++++++
 tests/mock/context.py                           |   1 +
 tests/mock/models.py                            |  48 ++++--
 tests/storage/test_models.py                    |   5 +-
 tests/workflows/api/test_task.py                |  28 ++--
 tests/workflows/core/test_engine.py             |  50 +++---
 tests/workflows/core/test_task.py               | 114 +++++++++++++
 .../test_task_graph_into_exececution_graph.py   |   6 +-
 tests/workflows/executor/test_executor.py       |   9 +-
 30 files changed, 1007 insertions(+), 309 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/context/__init__.py
----------------------------------------------------------------------
diff --git a/aria/context/__init__.py b/aria/context/__init__.py
index 20e19db..ad89b13 100644
--- a/aria/context/__init__.py
+++ b/aria/context/__init__.py
@@ -18,3 +18,4 @@ Provides contexts to workflow and operation
 """
 
 from . import workflow, operation
+from .toolbelt import toolbelt

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/context/common.py
----------------------------------------------------------------------
diff --git a/aria/context/common.py b/aria/context/common.py
new file mode 100644
index 0000000..5e0cff7
--- /dev/null
+++ b/aria/context/common.py
@@ -0,0 +1,137 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+A common context for both workflow and operation
+"""
+from uuid import uuid4
+
+from .. import logger
+from ..tools.lru_cache import lru_cache
+
+
+class BaseContext(logger.LoggerMixin):
+    """
+    Base context object for workflow and operation
+    """
+
+    def __init__(
+            self,
+            name,
+            model_storage,
+            resource_storage,
+            deployment_id,
+            workflow_id,
+            execution_id=None,
+            parameters=None,
+            task_max_attempts=1,
+            task_retry_interval=0,
+            **kwargs):
+        super(BaseContext, self).__init__(**kwargs)
+        self.name = name
+        self.id = str(uuid4())
+        self._model = model_storage
+        self._resource = resource_storage
+        self._deployment_id = deployment_id
+        self._workflow_id = workflow_id
+        self._execution_id = execution_id or str(uuid4())
+        self.parameters = parameters or {}
+        self.task_max_attempts = task_max_attempts
+        self.task_retry_interval = task_retry_interval
+
+    def __repr__(self):
+        return (
+            '{name}(name={self.name}, '
+            'deployment_id={self._deployment_id}, '
+            'workflow_id={self._workflow_id}, '
+            'execution_id={self._execution_id})'
+            .format(name=self.__class__.__name__, self=self))
+
+    @property
+    def model(self):
+        """
+        Access to the model storage
+        :return:
+        """
+        return self._model
+
+    @property
+    def resource(self):
+        """
+        Access to the resource storage
+        :return:
+        """
+        return self._resource
+
+    @property
+    @lru_cache()
+    def blueprint(self):
+        """
+        The blueprint model
+        """
+        return self.model.blueprint.get(self.deployment.blueprint_id)
+
+    @property
+    @lru_cache()
+    def deployment(self):
+        """
+        The deployment model
+        """
+        return self.model.deployment.get(self._deployment_id)
+
+    @property
+    def execution(self):
+        """
+        The execution model
+        """
+        return self.model.execution.get(self._execution_id)
+
+    @execution.setter
+    def execution(self, value):
+        """
+        Store the execution in the model storage
+        """
+        self.model.execution.store(value)
+
+    def download_blueprint_resource(self, destination, path=None):
+        """
+        Download a blueprint resource from the resource storage
+        """
+        return self.resource.blueprint.download(
+            entry_id=self.blueprint.id,
+            destination=destination,
+            path=path)
+
+    def download_deployment_resource(self, destination, path=None):
+        """
+        Download a deployment resource from the resource storage
+        """
+        return self.resource.deployment.download(
+            entry_id=self._deployment_id,
+            destination=destination,
+            path=path)
+
+    @lru_cache()
+    def get_deployment_resource_data(self, path=None):
+        """
+        Read a deployment resource as string from the resource storage
+        """
+        return self.resource.deployment.data(entry_id=self._deployment_id, path=path)
+
+    @lru_cache()
+    def get_blueprint_resource_data(self, path=None):
+        """
+        Read a blueprint resource as string from the resource storage
+        """
+        return self.resource.blueprint.data(entry_id=self._deployment_id, path=path)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/context/operation.py b/aria/context/operation.py
index d4d229a..e0e191d 100644
--- a/aria/context/operation.py
+++ b/aria/context/operation.py
@@ -17,52 +17,113 @@
 Workflow and operation contexts
 """
 
-from uuid import uuid4
 
-from aria.logger import LoggerMixin
+from .common import BaseContext
 
-class OperationContext(LoggerMixin):
+
+class BaseOperationContext(BaseContext):
     """
     Context object used during operation creation and execution
     """
 
-    def __init__(
-            self,
-            name,
-            operation_details,
-            workflow_context,
-            node_instance,
-            inputs=None):
-        super(OperationContext, self).__init__()
-        self.name = name
-        self.id = str(uuid4())
-        self.operation_details = operation_details
-        self.workflow_context = workflow_context
-        self.node_instance = node_instance
-        self.inputs = inputs or {}
+    def __init__(self, name, workflow_context, task, **kwargs):
+        super(BaseOperationContext, self).__init__(
+            name=name,
+            model_storage=workflow_context.model,
+            resource_storage=workflow_context.resource,
+            deployment_id=workflow_context._deployment_id,
+            workflow_id=workflow_context._workflow_id,
+            execution_id=workflow_context._execution_id,
+            parameters=workflow_context.parameters,
+            **kwargs)
+        self._workflow_context = workflow_context
+        self._task_model = task
+        self._actor = self.task.actor
 
     def __repr__(self):
         details = ', '.join(
             '{0}={1}'.format(key, value)
-            for key, value in self.operation_details.items())
+            for key, value in self.task.operation_details.items())
         return '{name}({0})'.format(details, name=self.name)
 
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.workflow_context, attr)
-        except AttributeError:
-            return super(OperationContext, self).__getattribute__(attr)
+    @property
+    def task(self):
+        """
+        The task in the model storage
+        :return: Task model
+        """
+        return self._task_model
+
+
+class NodeOperationContext(BaseOperationContext):
+    """
+    Context for node based operations.
+    """
+    @property
+    def node(self):
+        """
+        the node of the current operation
+        :return:
+        """
+        return self._actor.node
+
+    @property
+    def node_instance(self):
+        """
+        The node instance of the current operation
+        :return:
+        """
+        return self._actor
+
 
+class RelationshipOperationContext(BaseOperationContext):
+    """
+    Context for relationship based operations.
+    """
     @property
-    def operation(self):
+    def node(self):
         """
-        The model operation
+        The source node
+        :return:
         """
-        return self.storage.operation.get(self.id)
+        return self.model.node.get(self.relationship.source_id)
 
-    @operation.setter
-    def operation(self, value):
+    @property
+    def node_instance(self):
+        """
+        The source node instance
+        :return:
+        """
+        return self.model.node_instance.get(self.relationship_instance.source_id)
+
+    @property
+    def target_node(self):
+        """
+        The target node
+        :return:
+        """
+        return self.model.node.get(self.relationship.target_id)
+
+    @property
+    def target_node_instance(self):
+        """
+        The target node instance
+        :return:
+        """
+        return self.model.node_instance.get(self._actor.target_id)
+
+    @property
+    def relationship(self):
+        """
+        The relationship of the current operation
+        :return:
+        """
+        return self._actor.relationship
+
+    @property
+    def relationship_instance(self):
         """
-        Store the operation in the model storage
+        The relationship instance of the current operation
+        :return:
         """
-        self.storage.operation.store(value)
+        return self._actor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/context/toolbelt.py
----------------------------------------------------------------------
diff --git a/aria/context/toolbelt.py b/aria/context/toolbelt.py
new file mode 100644
index 0000000..56d594e
--- /dev/null
+++ b/aria/context/toolbelt.py
@@ -0,0 +1,102 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+provides with different tools when working with the operation.
+"""
+
+from contextlib import contextmanager
+
+from .. import exceptions
+from . import operation
+
+
+class _BaseToolbelt(object):
+    """
+    Base tool belt
+    """
+    def __init__(self):
+        self._op_context = None
+        self._workflow_context = None
+
+    @contextmanager
+    def use(self, operation_context):
+        """
+        Context manager which switches the current toolbelt with the supplied ctx
+        :param operation_context:
+        :return:
+        """
+        assert isinstance(operation_context, operation.BaseOperationContext)
+        _op_context = self._op_context
+        _workflow_context = self._workflow_context
+
+        self._op_context = operation_context
+        self._workflow_context = operation_context._workflow_context
+        try:
+            yield self
+        finally:
+            self._op_context = _op_context
+            self._workflow_context = _workflow_context
+
+
+class _NodeToolBelt(_BaseToolbelt):
+    """
+    Node operation related tool belt
+    """
+    @property
+    def relationships_to_me(self):
+        """
+        Any relationship which the current node is the target of
+        :return:
+        """
+        assert isinstance(self._op_context, operation.NodeOperationContext)
+        for node_instance in self._workflow_context.node_instances:
+            for relationship_instance in node_instance.relationship_instances:
+                if relationship_instance.target_id == self._op_context.node_instance.id:
+                    yield relationship_instance
+
+    @property
+    def host_ip(self):
+        """
+        The host ip of the current node
+        :return:
+        """
+        assert isinstance(self._op_context, operation.NodeOperationContext)
+        host_id = self._op_context._actor.host_id
+        host_instance = self._workflow_context.model.node_instance.get(host_id)
+        return host_instance.runtime_properties.get('ip')
+
+
+class _RelationshipToolBelt(_BaseToolbelt):
+    """
+    Relationship operation related tool belt
+    """
+    pass
+
+_operation_toolbelt = _NodeToolBelt()
+_relationship_toolbelt = _RelationshipToolBelt()
+
+
+def toolbelt(operation_context):
+    """
+    Get a toolbelt according to the current operation executor
+    :param operation_context:
+    :return:
+    """
+    if isinstance(operation_context, operation.NodeOperationContext):
+        return _operation_toolbelt.use(operation_context)
+    elif isinstance(operation_context, operation.RelationshipOperationContext):
+        return _relationship_toolbelt.use(operation_context)
+    else:
+        raise exceptions.TaskException("Operation context not supported")

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/context/workflow.py b/aria/context/workflow.py
index fb9c8ee..97bd7f9 100644
--- a/aria/context/workflow.py
+++ b/aria/context/workflow.py
@@ -18,12 +18,11 @@ Workflow and operation contexts
 """
 
 import threading
-from uuid import uuid4
 from contextlib import contextmanager
 
-from .. import logger
-from ..tools.lru_cache import lru_cache
-from .. import exceptions
+from aria import exceptions
+
+from .common import BaseContext
 
 
 class ContextException(exceptions.AriaError):
@@ -33,143 +32,51 @@ class ContextException(exceptions.AriaError):
     pass
 
 
-class WorkflowContext(logger.LoggerMixin):
+class WorkflowContext(BaseContext):
     """
     Context object used during workflow creation and execution
     """
-
-    def __init__(
-            self,
-            name,
-            model_storage,
-            resource_storage,
-            deployment_id,
-            workflow_id,
-            execution_id=None,
-            parameters=None,
-            task_max_attempts=1,
-            task_retry_interval=0,
-            **kwargs):
-        super(WorkflowContext, self).__init__(**kwargs)
-        self.name = name
-        self.id = str(uuid4())
-        self.model = model_storage
-        self.resource = resource_storage
-        self.deployment_id = deployment_id
-        self.workflow_id = workflow_id
-        self.execution_id = execution_id or str(uuid4())
-        self.parameters = parameters or {}
-        self.task_max_attempts = task_max_attempts
-        self.task_retry_interval = task_retry_interval
+    def __init__(self, *args, **kwargs):
+        super(WorkflowContext, self).__init__(*args, **kwargs)
         # TODO: execution creation should happen somewhere else
         # should be moved there, when such logical place exists
         try:
-            self.model.execution.get(self.execution_id)
+            self.model.execution.get(self._execution_id)
         except exceptions.StorageError:
             self._create_execution()
 
     def __repr__(self):
         return (
-            '{name}(deployment_id={self.deployment_id}, '
-            'workflow_id={self.workflow_id}, '
-            'execution_id={self.execution_id})'.format(
+            '{name}(deployment_id={self._deployment_id}, '
+            'workflow_id={self._workflow_id}, '
+            'execution_id={self._execution_id})'.format(
                 name=self.__class__.__name__, self=self))
 
     def _create_execution(self):
         execution_cls = self.model.execution.model_cls
         execution = self.model.execution.model_cls(
-            id=self.execution_id,
-            deployment_id=self.deployment_id,
-            workflow_id=self.workflow_id,
-            blueprint_id=self.blueprint_id,
+            id=self._execution_id,
+            deployment_id=self.deployment.id,
+            workflow_id=self._workflow_id,
+            blueprint_id=self.blueprint.id,
             status=execution_cls.PENDING,
             parameters=self.parameters,
         )
         self.model.execution.store(execution)
 
     @property
-    def blueprint_id(self):
-        """
-        The blueprint id
-        """
-        return self.deployment.blueprint_id
-
-    @property
-    @lru_cache()
-    def blueprint(self):
-        """
-        The blueprint model
-        """
-        return self.model.blueprint.get(self.blueprint_id)
-
-    @property
-    @lru_cache()
-    def deployment(self):
-        """
-        The deployment model
-        """
-        return self.model.deployment.get(self.deployment_id)
-
-    @property
     def nodes(self):
         """
         Iterator over nodes
         """
-        return self.model.node.iter(
-            filters={'blueprint_id': self.blueprint_id})
+        return self.model.node.iter(filters={'blueprint_id': self.blueprint.id})
 
     @property
     def node_instances(self):
         """
         Iterator over node instances
         """
-        return self.model.node_instance.iter(filters={'deployment_id': self.deployment_id})
-
-    @property
-    def execution(self):
-        """
-        The execution model
-        """
-        return self.model.execution.get(self.execution_id)
-
-    @execution.setter
-    def execution(self, value):
-        """
-        Store the execution in the model storage
-        """
-        self.model.execution.store(value)
-
-    def download_blueprint_resource(self, destination, path=None):
-        """
-        Download a blueprint resource from the resource storage
-        """
-        return self.resource.blueprint.download(
-            entry_id=self.blueprint_id,
-            destination=destination,
-            path=path)
-
-    def download_deployment_resource(self, destination, path=None):
-        """
-        Download a deployment resource from the resource storage
-        """
-        return self.resource.deployment.download(
-            entry_id=self.deployment_id,
-            destination=destination,
-            path=path)
-
-    @lru_cache()
-    def get_deployment_resource_data(self, path=None):
-        """
-        Read a deployment resource as string from the resource storage
-        """
-        return self.resource.deployment.data(entry_id=self.deployment_id, path=path)
-
-    @lru_cache()
-    def get_blueprint_resource_data(self, path=None):
-        """
-        Read a blueprint resource as string from the resource storage
-        """
-        return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path)
+        return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id})
 
 
 class _CurrentContext(threading.local):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/decorators.py
----------------------------------------------------------------------
diff --git a/aria/decorators.py b/aria/decorators.py
index a07e2ee..ddaa545 100644
--- a/aria/decorators.py
+++ b/aria/decorators.py
@@ -25,23 +25,17 @@ from .workflows.api import task_graph
 from .tools.validation import validate_function_arguments
 
 
-def workflow(
-        func=None,
-        simple_workflow=True,
-        suffix_template=''):
+def workflow(func=None, suffix_template=''):
     """
     Workflow decorator
     """
     if func is None:
-        return partial(
-            workflow,
-            simple_workflow=simple_workflow,
-            suffix_template=suffix_template)
+        return partial(workflow, suffix_template=suffix_template)
 
     @wraps(func)
     def _wrapper(ctx, **workflow_parameters):
 
-        workflow_name = _generate_workflow_name(
+        workflow_name = _generate_name(
             func_name=func.__name__,
             suffix_template=suffix_template,
             ctx=ctx,
@@ -56,34 +50,24 @@ def workflow(
     return _wrapper
 
 
-def operation(
-        func=None):
+def operation(func=None, toolbelt=False, suffix_template=''):
     """
     Operation decorator
     """
     if func is None:
-        return partial(operation)
+        return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt)
 
     @wraps(func)
-    def _wrapper(ctx, **custom_kwargs):
-        func_kwargs = _create_func_kwargs(
-            custom_kwargs,
-            ctx)
-        validate_function_arguments(func, func_kwargs)
-        ctx.description = func.__doc__
-        return func(**func_kwargs)
+    def _wrapper(**func_kwargs):
+        with context.toolbelt(func_kwargs.get('ctx')) as operation_toolbelt:
+            if toolbelt:
+                func_kwargs.setdefault('toolbelt', operation_toolbelt)
+            validate_function_arguments(func, func_kwargs)
+            return func(**func_kwargs)
     return _wrapper
 
 
-def _generate_workflow_name(func_name, ctx, suffix_template, **custom_kwargs):
+def _generate_name(func_name, ctx, suffix_template, **custom_kwargs):
     return '{func_name}.{suffix}'.format(
         func_name=func_name,
         suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4()))
-
-
-def _create_func_kwargs(
-        kwargs,
-        ctx,
-        workflow_name=None):
-    kwargs.setdefault('graph', ctx.task_graph(workflow_name))
-    return kwargs

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 0002cb5..0fbf6cc 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -39,7 +39,7 @@ class LoggerMixin(object):
         self.logger_name = self.logger_name or self.__class__.__name__
         self.logger = logging.getLogger('{0}.{1}'.format(_base_logger.name, self.logger_name))
         self.logger.setLevel(self.logger_level)
-        super(LoggerMixin, self).__init__(*args, **kwargs)
+        super(LoggerMixin, self).__init__()
 
     @classmethod
     def with_logger(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index 23f2408..1848629 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -241,6 +241,7 @@ class Relationship(Model):
     A Model which represents a relationship
     """
     id = Field(type=basestring, default=uuid_generator)
+    source_id = Field(type=basestring)
     target_id = Field(type=basestring)
     source_interfaces = Field(type=dict)
     source_operations = Field(type=dict)
@@ -290,6 +291,8 @@ class RelationshipInstance(Model):
     id = Field(type=basestring, default=uuid_generator)
     target_id = Field(type=basestring)
     target_name = Field(type=basestring)
+    source_id = Field(type=basestring)
+    source_name = Field(type=basestring)
     type = Field(type=basestring)
     relationship = PointerField(type=Relationship)
 
@@ -436,5 +439,5 @@ class Task(Model):
     # Operation specific fields
     name = Field(type=basestring)
     operation_details = Field(type=dict)
-    node_instance = PointerField(type=NodeInstance)
+    actor = Field()
     inputs = Field(type=dict, default=lambda: {})

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/storage/structures.py
----------------------------------------------------------------------
diff --git a/aria/storage/structures.py b/aria/storage/structures.py
index a26e7eb..399922e 100644
--- a/aria/storage/structures.py
+++ b/aria/storage/structures.py
@@ -26,7 +26,6 @@ classes:
     * IterPointerField - represents an iterable pointers field.
     * Model - abstract model implementation.
 """
-
 import json
 from uuid import uuid4
 from itertools import count

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py
index bb8e045..ba54b31 100644
--- a/aria/workflows/api/task.py
+++ b/aria/workflows/api/task.py
@@ -18,7 +18,10 @@ Provides the tasks to be entered into the task graph
 """
 from uuid import uuid4
 
-from ... import context
+from ... import (
+    context,
+    storage,
+)
 
 
 class BaseTask(object):
@@ -57,7 +60,7 @@ class OperationTask(BaseTask):
     def __init__(self,
                  name,
                  operation_details,
-                 node_instance,
+                 actor,
                  max_attempts=None,
                  retry_interval=None,
                  inputs=None):
@@ -65,13 +68,15 @@ class OperationTask(BaseTask):
         Creates an operation task using the name, details, node instance and any additional kwargs.
         :param name: the operation of the name.
         :param operation_details: the details for the operation.
-        :param node_instance: the node instance on which this operation is registered.
+        :param actor: the operation host on which this operation is registered.
         :param inputs: operation inputs.
         """
+        assert isinstance(actor, (storage.models.NodeInstance,
+                                               storage.models.RelationshipInstance))
         super(OperationTask, self).__init__()
         self.name = name
         self.operation_details = operation_details
-        self.node_instance = node_instance
+        self.actor = actor
         self.inputs = inputs or {}
         self.max_attempts = (self.workflow_context.task_max_attempts
                              if max_attempts is None else max_attempts)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/builtin/heal.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/heal.py b/aria/workflows/builtin/heal.py
index dc320dc..7174de3 100644
--- a/aria/workflows/builtin/heal.py
+++ b/aria/workflows/builtin/heal.py
@@ -99,12 +99,10 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances):
 
             if target_node_instance in failing_node_instances:
                 dependency = relationship_tasks(
-                    graph=graph,
                     node_instance=node_instance,
                     relationship_instance=relationship_instance,
-                    context=ctx,
                     operation_name='aria.interfaces.relationship_lifecycle.unlink')
-
+                graph.add_tasks(*dependency)
                 graph.add_dependency(node_instance_sub_workflow, dependency)
 
 
@@ -154,12 +152,10 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances):
 
             if target_node_instance in failing_node_instances:
                 dependent = relationship_tasks(
-                    graph=graph,
                     node_instance=node_instance,
                     relationship_instance=relationship_instance,
-                    context=ctx,
                     operation_name='aria.interfaces.relationship_lifecycle.establish')
-
+                graph.add_tasks(*dependent)
                 graph.add_dependency(dependent, node_instance_sub_workflow)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/workflows.py b/aria/workflows/builtin/workflows.py
index fc54f75..f9117ac 100644
--- a/aria/workflows/builtin/workflows.py
+++ b/aria/workflows/builtin/workflows.py
@@ -34,7 +34,7 @@ __all__ = (
 # Install node instance workflow and sub workflows
 
 @workflow(suffix_template='{node_instance.id}')
-def install_node_instance(ctx, graph, node_instance):
+def install_node_instance(graph, node_instance, **kwargs):
     """
     A workflow which installs a node instance.
     :param WorkflowContext ctx: the workflow context
@@ -45,32 +45,32 @@ def install_node_instance(ctx, graph, node_instance):
     create_node_instance = task.OperationTask(
         name='aria.interfaces.lifecycle.create.{0}'.format(node_instance.id),
         operation_details=node_instance.node.operations['aria.interfaces.lifecycle.create'],
-        node_instance=node_instance
+        actor=node_instance
     )
     configure_node_instance = task.OperationTask(
         name='aria.interfaces.lifecycle.configure.{0}'.format(node_instance.id),
         operation_details=node_instance.node.operations['aria.interfaces.lifecycle.configure'],
-        node_instance=node_instance
+        actor=node_instance
         )
     start_node_instance = task.OperationTask(
         name='aria.interfaces.lifecycle.start.{0}'.format(node_instance.id),
         operation_details=node_instance.node.operations['aria.interfaces.lifecycle.start'],
-        node_instance=node_instance
+        actor=node_instance
     )
 
     graph.sequence(
         create_node_instance,
-        preconfigure_relationship(graph, ctx, node_instance),
+        preconfigure_relationship(graph, node_instance),
         configure_node_instance,
-        postconfigure_relationship(graph, ctx, node_instance),
+        postconfigure_relationship(graph, node_instance),
         start_node_instance,
-        establish_relationship(graph, ctx, node_instance)
+        establish_relationship(graph, node_instance)
     )
 
     return graph
 
 
-def preconfigure_relationship(graph, ctx, node_instance):
+def preconfigure_relationship(graph, node_instance, **kwargs):
     """
 
     :param context:
@@ -81,11 +81,10 @@ def preconfigure_relationship(graph, ctx, node_instance):
     return relationships_tasks(
         graph=graph,
         operation_name='aria.interfaces.relationship_lifecycle.preconfigure',
-        context=ctx,
         node_instance=node_instance)
 
 
-def postconfigure_relationship(graph, ctx, node_instance):
+def postconfigure_relationship(graph, node_instance, **kwargs):
     """
 
     :param context:
@@ -96,11 +95,10 @@ def postconfigure_relationship(graph, ctx, node_instance):
     return relationships_tasks(
         graph=graph,
         operation_name='aria.interfaces.relationship_lifecycle.postconfigure',
-        context=ctx,
         node_instance=node_instance)
 
 
-def establish_relationship(graph, ctx, node_instance):
+def establish_relationship(graph, node_instance, **kwargs):
     """
 
     :param context:
@@ -111,14 +109,13 @@ def establish_relationship(graph, ctx, node_instance):
     return relationships_tasks(
         graph=graph,
         operation_name='aria.interfaces.relationship_lifecycle.establish',
-        context=ctx,
         node_instance=node_instance)
 
 
 # Uninstall node instance workflow and subworkflows
 
 @workflow(suffix_template='{node_instance.id}')
-def uninstall_node_instance(ctx, graph, node_instance):
+def uninstall_node_instance(graph, node_instance, **kwargs):
     """
         A workflow which uninstalls a node instance.
         :param WorkflowContext context: the workflow context
@@ -129,22 +126,22 @@ def uninstall_node_instance(ctx, graph, node_instance):
     stop_node_instance = task.OperationTask(
         name='aria.interfaces.lifecycle.stop.{0}'.format(node_instance.id),
         operation_details=node_instance.node.operations['aria.interfaces.lifecycle.stop'],
-        node_instance=node_instance
+        actor=node_instance,
     )
     delete_node_instance = task.OperationTask(
         name='aria.interfaces.lifecycle.delete.{0}'.format(node_instance.id),
         operation_details=node_instance.node.operations['aria.interfaces.lifecycle.delete'],
-        node_instance=node_instance
+        actor=node_instance
     )
 
     graph.sequence(
         stop_node_instance,
-        unlink_relationship(graph, ctx, node_instance),
+        unlink_relationship(graph, node_instance),
         delete_node_instance
     )
 
 
-def unlink_relationship(graph, ctx, node_instance):
+def unlink_relationship(graph, node_instance):
     """
 
     :param context:
@@ -155,7 +152,6 @@ def unlink_relationship(graph, ctx, node_instance):
     return relationships_tasks(
         graph=graph,
         operation_name='aria.interfaces.relationship_lifecycle.unlink',
-        context=ctx,
         node_instance=node_instance
     )
 
@@ -167,8 +163,6 @@ def execute_operation_on_instance(
         allow_kwargs_override):
     """
     A workflow which executes a single operation
-    :param WorkflowContext context: the workflow to execute.
-    :param TaskGraph graph: the tasks graph of which to edit
     :param node_instance: the node instance to install
     :param basestring operation: the operation name
     :param dict operation_kwargs:
@@ -186,15 +180,11 @@ def execute_operation_on_instance(
     return task.OperationTask(
         name=task_name,
         operation_details=node_instance.node.operations[operation],
-        node_instance=node_instance,
+        actor=node_instance,
         inputs=operation_kwargs)
 
 
-
-def relationships_tasks(graph,
-                        operation_name,
-                        context,
-                        node_instance):
+def relationships_tasks(graph, operation_name, node_instance):
     """
     Creates a relationship task (source and target) for all of a node_instance relationships.
     :param basestring operation_name: the relationship operation name.
@@ -210,10 +200,8 @@ def relationships_tasks(graph,
     for index, (_, relationship_group) in enumerate(relationships_groups):
         for relationship_instance in relationship_group:
             relationship_operations = relationship_tasks(
-                graph=graph,
                 node_instance=node_instance,
                 relationship_instance=relationship_instance,
-                context=context,
                 operation_name=operation_name,
                 index=index)
             sub_tasks.append(relationship_operations)
@@ -221,12 +209,7 @@ def relationships_tasks(graph,
     return graph.sequence(*sub_tasks)
 
 
-def relationship_tasks(graph,
-                       node_instance,
-                       relationship_instance,
-                       context,
-                       operation_name,
-                       index=None):
+def relationship_tasks(node_instance, relationship_instance, operation_name, index=None):
     """
     Creates a relationship task source and target.
     :param NodeInstance node_instance: the node instance of the relationship
@@ -245,12 +228,10 @@ def relationship_tasks(graph,
     )
     source_operation = task.OperationTask(
         name=operation_name_template.format('source'),
-        node_instance=node_instance,
-        operation_details=relationship_instance.relationship.source_operations[
-            operation_name])
+        actor=relationship_instance,
+        operation_details=relationship_instance.relationship.source_operations[operation_name])
     target_operation = task.OperationTask(
         name=operation_name_template.format('target'),
-        node_instance=context.model.node_instance.get(relationship_instance.target_id),
-        operation_details=relationship_instance.relationship.target_operations[
-            operation_name])
-    return graph.add_tasks(source_operation, target_operation)
+        actor=relationship_instance,
+        operation_details=relationship_instance.relationship.target_operations[operation_name])
+    return source_operation, target_operation

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/__init__.py b/aria/workflows/core/__init__.py
index 646f44a..e377153 100644
--- a/aria/workflows/core/__init__.py
+++ b/aria/workflows/core/__init__.py
@@ -17,4 +17,4 @@
 Core for the workflow execution mechanism
 """
 
-from . import task
+from . import task, translation, engine

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py
index b90306a..9e8473e 100644
--- a/aria/workflows/core/task.py
+++ b/aria/workflows/core/task.py
@@ -18,12 +18,29 @@ Workflow tasks
 """
 from contextlib import contextmanager
 from datetime import datetime
+from functools import (
+    partial,
+    wraps,
+)
 
 from ... import logger
 from ...storage import models
+from ...context import operation as operation_context
 from .. import exceptions
 
 
+def _locked(func=None):
+    if func is None:
+        return partial(_locked, func=_locked)
+
+    @wraps(func)
+    def _wrapper(self, value, **kwargs):
+        if self._update_fields is None:
+            raise exceptions.TaskException("Task is not in update mode")
+        return func(self, value, **kwargs)
+    return _wrapper
+
+
 class BaseTask(logger.LoggerMixin):
     """
     Base class for Task objects
@@ -80,7 +97,7 @@ class EndSubWorkflowTask(StubTask):
     pass
 
 
-class OperationTask(BaseTask, logger.LoggerMixin):
+class OperationTask(BaseTask):
     """
     Operation tasks
     """
@@ -89,18 +106,31 @@ class OperationTask(BaseTask, logger.LoggerMixin):
         super(OperationTask, self).__init__(id=api_task.id, **kwargs)
         self._workflow_ctx = api_task.workflow_context
         task_model = api_task.workflow_context.model.task.model_cls
-        task = task_model(
+        operation_task = task_model(
+            id=api_task.id,
             name=api_task.name,
             operation_details=api_task.operation_details,
-            node_instance=api_task.node_instance,
+            actor=api_task.actor,
             inputs=api_task.inputs,
             status=task_model.PENDING,
-            execution_id=self.workflow_context.execution_id,
+            execution_id=self.workflow_context._execution_id,
             max_attempts=api_task.max_attempts,
             retry_interval=api_task.retry_interval,
         )
-        self.workflow_context.model.task.store(task)
-        self._task_id = task.id
+
+        if isinstance(api_task.actor, models.NodeInstance):
+            context_class = operation_context.NodeOperationContext
+        elif isinstance(api_task.actor, models.RelationshipInstance):
+            context_class = operation_context.RelationshipOperationContext
+        else:
+            context_class = None
+
+        self._ctx = context_class(name=api_task.name,
+                                  workflow_context=self.workflow_context,
+                                  task=operation_task)
+
+        self.workflow_context.model.task.store(operation_task)
+        self._task_id = operation_task.id
         self._update_fields = None
 
     @contextmanager
@@ -112,10 +142,10 @@ class OperationTask(BaseTask, logger.LoggerMixin):
         self._update_fields = {}
         try:
             yield
-            task = self.context
+            task = self.model_context
             for key, value in self._update_fields.items():
                 setattr(task, key, value)
-            self.context = task
+            self.model_context = task
         finally:
             self._update_fields = None
 
@@ -127,28 +157,37 @@ class OperationTask(BaseTask, logger.LoggerMixin):
         return self._workflow_ctx
 
     @property
-    def context(self):
+    def model_context(self):
         """
         Returns the task model in storage
         :return: task in storage
         """
         return self.workflow_context.model.task.get(self._task_id)
 
-    @context.setter
-    def context(self, value):
+    @model_context.setter
+    def model_context(self, value):
         self.workflow_context.model.task.store(value)
 
     @property
+    def context(self):
+        """
+        Contexts for the operation
+        :return:
+        """
+        return self._ctx
+
+    @property
     def status(self):
         """
         Returns the task status
         :return: task status
         """
-        return self.context.status
+        return self.model_context.status
 
     @status.setter
+    @_locked
     def status(self, value):
-        self._update_property('status', value)
+        self._update_fields['status'] = value
 
     @property
     def started_at(self):
@@ -156,11 +195,12 @@ class OperationTask(BaseTask, logger.LoggerMixin):
         Returns when the task started
         :return: when task started
         """
-        return self.context.started_at
+        return self.model_context.started_at
 
     @started_at.setter
+    @_locked
     def started_at(self, value):
-        self._update_property('started_at', value)
+        self._update_fields['started_at'] = value
 
     @property
     def ended_at(self):
@@ -168,11 +208,12 @@ class OperationTask(BaseTask, logger.LoggerMixin):
         Returns when the task ended
         :return: when task ended
         """
-        return self.context.ended_at
+        return self.model_context.ended_at
 
     @ended_at.setter
+    @_locked
     def ended_at(self, value):
-        self._update_property('ended_at', value)
+        self._update_fields['ended_at'] = value
 
     @property
     def retry_count(self):
@@ -180,11 +221,12 @@ class OperationTask(BaseTask, logger.LoggerMixin):
         Returns the retry count for the task
         :return: retry count
         """
-        return self.context.retry_count
+        return self.model_context.retry_count
 
     @retry_count.setter
+    @_locked
     def retry_count(self, value):
-        self._update_property('retry_count', value)
+        self._update_fields['retry_count'] = value
 
     @property
     def due_at(self):
@@ -192,19 +234,15 @@ class OperationTask(BaseTask, logger.LoggerMixin):
         Returns the minimum datetime in which the task can be executed
         :return: eta
         """
-        return self.context.due_at
+        return self.model_context.due_at
 
     @due_at.setter
+    @_locked
     def due_at(self, value):
-        self._update_property('due_at', value)
+        self._update_fields['due_at'] = value
 
     def __getattr__(self, attr):
         try:
-            return getattr(self.context, attr)
+            return getattr(self.model_context, attr)
         except AttributeError:
             return super(OperationTask, self).__getattribute__(attr)
-
-    def _update_property(self, key, value):
-        if self._update_fields is None:
-            raise exceptions.TaskException("Task is not in update mode")
-        self._update_fields[key] = value

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/__init__.py b/aria/workflows/executor/__init__.py
index ae1e83e..09fb12c 100644
--- a/aria/workflows/executor/__init__.py
+++ b/aria/workflows/executor/__init__.py
@@ -12,3 +12,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+"""
+Executors for task execution
+"""
+
+
+from . import blocking, celery, multiprocess, thread

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/blocking.py b/aria/workflows/executor/blocking.py
index f072d8a..1647264 100644
--- a/aria/workflows/executor/blocking.py
+++ b/aria/workflows/executor/blocking.py
@@ -30,7 +30,7 @@ class CurrentThreadBlockingExecutor(BaseExecutor):
         self._task_started(task)
         try:
             task_func = module.load_attribute(task.operation_details['operation'])
-            task_func(**task.inputs)
+            task_func(ctx=task.context, **task.inputs)
             self._task_succeeded(task)
         except BaseException as e:
             self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/executor/celery.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/celery.py b/aria/workflows/executor/celery.py
index a82a6b7..53ef41b 100644
--- a/aria/workflows/executor/celery.py
+++ b/aria/workflows/executor/celery.py
@@ -46,6 +46,7 @@ class CeleryExecutor(BaseExecutor):
         self._tasks[task.id] = task
         self._results[task.id] = self._app.send_task(
             task.operation_details['operation'],
+            ctx=task.model_context,
             kwargs=task.inputs,
             task_id=task.id,
             queue=self._get_queue(task))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/multiprocess.py b/aria/workflows/executor/multiprocess.py
index 4af08c0..c6423b6 100644
--- a/aria/workflows/executor/multiprocess.py
+++ b/aria/workflows/executor/multiprocess.py
@@ -46,6 +46,7 @@ class MultiprocessExecutor(BaseExecutor):
         self._tasks[task.id] = task
         self._pool.apply_async(_multiprocess_handler, args=(
             self._queue,
+            task.context,
             task.id,
             task.operation_details,
             task.inputs))
@@ -86,11 +87,11 @@ class _MultiprocessMessage(object):
         self.exception = exception
 
 
-def _multiprocess_handler(queue, task_id, operation_details, operation_inputs):
+def _multiprocess_handler(queue, ctx, task_id, operation_details, operation_inputs):
     queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
     try:
         task_func = module.load_attribute(operation_details['operation'])
-        task_func(**operation_inputs)
+        task_func(ctx=ctx, **operation_inputs)
         queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id))
     except BaseException as e:
         queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/aria/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/workflows/executor/thread.py b/aria/workflows/executor/thread.py
index 180c482..a6b616b 100644
--- a/aria/workflows/executor/thread.py
+++ b/aria/workflows/executor/thread.py
@@ -56,7 +56,7 @@ class ThreadExecutor(BaseExecutor):
                 self._task_started(task)
                 try:
                     task_func = module.load_attribute(task.operation_details['operation'])
-                    task_func(**task.inputs)
+                    task_func(ctx=task.context, **task.inputs)
                     self._task_succeeded(task)
                 except BaseException as e:
                     self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/context/__init__.py b/tests/context/__init__.py
index ae1e83e..bb0fa61 100644
--- a/tests/context/__init__.py
+++ b/tests/context/__init__.py
@@ -12,3 +12,27 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import sys
+
+import pytest
+
+from aria.workflows.core import engine
+
+global_test_holder = {}
+
+
+def op_path(func, module_path=None):
+    module_path = module_path or sys.modules[__name__].__name__
+    return '{0}.{1}'.format(module_path, func.__name__)
+
+
+def execute(workflow_func, workflow_context, executor):
+    graph = workflow_func(ctx=workflow_context)
+    eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph)
+    eng.execute()
+
+
+@pytest.fixture(autouse=True)
+def cleanup():
+    global_test_holder.clear()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/context/test_operation.py b/tests/context/test_operation.py
new file mode 100644
index 0000000..1d3430b
--- /dev/null
+++ b/tests/context/test_operation.py
@@ -0,0 +1,158 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+import pytest
+
+from aria import (
+    workflow,
+    operation,
+    context,
+)
+from aria.workflows import api
+from aria.workflows.executor import thread
+
+from .. import mock
+from . import (
+    op_path,
+    execute,
+    global_test_holder,
+)
+
+
+@pytest.fixture
+def workflow_context():
+    return mock.context.simple()
+
+
+@pytest.fixture
+def executor():
+    result = thread.ThreadExecutor()
+    try:
+        yield result
+    finally:
+        result.close()
+
+
+def test_node_operation_task_execution(workflow_context, executor):
+    node = mock.models.get_dependency_node()
+    node_instance = mock.models.get_dependency_node_instance(node)
+    workflow_context.model.node.store(node)
+    workflow_context.model.node_instance.store(node_instance)
+
+    node_instance = \
+        workflow_context.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+    name = 'op_name'
+    operation_details = {
+        'operation': op_path(my_operation, module_path=sys.modules[__name__].__name__)
+    }
+    inputs = {'putput': True}
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask(
+                name=name,
+                operation_details=operation_details,
+                actor=node_instance,
+                inputs=inputs
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
+
+    operation_value = global_test_holder[name]
+
+    assert isinstance(operation_value, context.operation.NodeOperationContext)
+
+    # operation container based attributes
+    for key, value in node_instance.fields_dict.items():
+        assert getattr(operation_value._actor, key) == value
+
+    # Task bases assertions
+    assert operation_value.task.actor == node_instance
+    assert operation_value.task.name == name
+    assert operation_value.task.operation_details == operation_details
+    assert operation_value.task.inputs == inputs
+
+    # Context based attributes (sugaring)
+    assert operation_value.node == node_instance.node
+    assert operation_value.node_instance == node_instance
+
+
+def test_relationship_operation_task_execution(workflow_context, executor):
+    dependency_node = mock.models.get_dependency_node()
+    dependency_node_instance = mock.models.get_dependency_node_instance()
+    relationship = mock.models.get_relationship(target=dependency_node)
+    relationship_instance = mock.models.get_relationship_instance(
+        target_instance=dependency_node_instance,
+        relationship=relationship)
+    dependent_node = mock.models.get_dependent_node()
+    dependent_node_instance = mock.models.get_dependent_node_instance(
+        relationship_instance=relationship_instance,
+        dependent_node=dependency_node)
+    workflow_context.model.node.store(dependency_node)
+    workflow_context.model.node_instance.store(dependency_node_instance)
+    workflow_context.model.relationship.store(relationship)
+    workflow_context.model.relationship_instance.store(relationship_instance)
+    workflow_context.model.node.store(dependent_node)
+    workflow_context.model.node_instance.store(dependent_node_instance)
+
+    name = 'op_name'
+    operation_details = {
+        'operation': op_path(my_operation, module_path=sys.modules[__name__].__name__)
+    }
+    inputs = {'putput': True}
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask(
+                name=name,
+                operation_details=operation_details,
+                actor=relationship_instance,
+                inputs=inputs
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
+
+    operation_value = global_test_holder[name]
+
+    assert isinstance(operation_value, context.operation.RelationshipOperationContext)
+
+    # operation container based attributes
+    for key, value in relationship_instance.fields_dict.items():
+        assert getattr(operation_value._actor, key) == value
+
+    # Task bases assertions
+    assert operation_value.task.actor == relationship_instance
+    assert operation_value.task.name == name
+    assert operation_value.task.operation_details == operation_details
+    assert operation_value.task.inputs == inputs
+
+    # Context based attributes (sugaring)
+    assert operation_value.target_node == dependency_node
+    assert operation_value.target_node_instance == dependency_node_instance
+    assert operation_value.relationship == relationship
+    assert operation_value.relationship_instance == relationship_instance
+    assert operation_value.node == dependent_node
+    assert operation_value.node_instance == dependent_node_instance
+
+
+@operation
+def my_operation(ctx, **_):
+    global_test_holder[ctx.name] = ctx

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/context/test_toolbelt.py b/tests/context/test_toolbelt.py
new file mode 100644
index 0000000..c4796f7
--- /dev/null
+++ b/tests/context/test_toolbelt.py
@@ -0,0 +1,142 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sys
+
+import pytest
+
+from aria import workflow, operation, context, exceptions
+from aria.workflows import api
+from aria.workflows.executor import thread
+from aria.context.toolbelt import _RelationshipToolBelt, _NodeToolBelt
+
+from .. import mock
+from . import (
+    op_path,
+    execute,
+    global_test_holder,
+)
+
+
+@pytest.fixture
+def workflow_context():
+    return mock.context.simple()
+
+
+@pytest.fixture
+def executor():
+    result = thread.ThreadExecutor()
+    try:
+        yield result
+    finally:
+        result.close()
+
+
+def test_operation_tool_belt(workflow_context, executor):
+
+    dependency_node = mock.models.get_dependency_node()
+    dependency_node_instance = mock.models.get_dependency_node_instance()
+    relationship = mock.models.get_relationship(target=dependency_node)
+    relationship_instance = mock.models.get_relationship_instance(
+        target_instance=dependency_node_instance, relationship=relationship)
+    dependent_node = mock.models.get_dependent_node()
+    dependent_node_instance = mock.models.get_dependent_node_instance(
+        relationship_instance=relationship_instance, dependent_node=dependency_node)
+    workflow_context.model.node.store(dependency_node)
+    workflow_context.model.node_instance.store(dependency_node_instance)
+    workflow_context.model.relationship.store(relationship)
+    workflow_context.model.relationship_instance.store(relationship_instance)
+    workflow_context.model.node.store(dependent_node)
+    workflow_context.model.node_instance.store(dependent_node_instance)
+
+    name = 'op_name'
+    operation_details = {'operation': op_path(node_operation,
+                                              module_path=sys.modules[__name__].__name__)}
+    inputs = {'putput': True}
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask(
+                name=name,
+                operation_details=operation_details,
+                actor=dependency_node_instance,
+                inputs=inputs
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
+
+    assert isinstance(global_test_holder.get(name), _NodeToolBelt)
+    assert list(global_test_holder.get('relationships_to_me', [])) == list([relationship_instance])
+    assert global_test_holder.get('ip') == dependency_node_instance.runtime_properties.get('ip')
+
+
+def test_relationship_tool_belt(workflow_context, executor):
+    dependency_node = mock.models.get_dependency_node()
+    dependency_node_instance = \
+        mock.models.get_dependency_node_instance(dependency_node=dependency_node)
+    relationship = mock.models.get_relationship(target=dependency_node)
+    relationship_instance = \
+        mock.models.get_relationship_instance(target_instance=dependency_node_instance,
+                                              relationship=relationship)
+    dependent_node = mock.models.get_dependent_node(relationship=relationship)
+    dependent_node_instance = \
+        mock.models.get_dependent_node_instance(relationship_instance=relationship_instance,
+                                                dependent_node=dependent_node)
+    workflow_context.model.node.store(dependency_node)
+    workflow_context.model.node_instance.store(dependency_node_instance)
+    workflow_context.model.relationship.store(relationship)
+    workflow_context.model.relationship_instance.store(relationship_instance)
+    workflow_context.model.node.store(dependent_node)
+    workflow_context.model.node_instance.store(dependent_node_instance)
+
+    name = 'op_name'
+    operation_details = {'operation': op_path(relationship_operation,
+                                              module_path=sys.modules[__name__].__name__)}
+    inputs = {'putput': True}
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask(
+                name=name,
+                operation_details=operation_details,
+                actor=relationship_instance,
+                inputs=inputs
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
+
+    assert isinstance(global_test_holder.get(name), _RelationshipToolBelt)
+
+
+def test_wrong_model_toolbelt():
+    with pytest.raises(exceptions.TaskException):
+        context.toolbelt(None)
+
+
+@operation(toolbelt=True)
+def node_operation(ctx, toolbelt, **_):
+    global_test_holder['relationships_to_me'] = list(toolbelt.relationships_to_me)
+    global_test_holder['ip'] = toolbelt.host_ip
+    global_test_holder[ctx.name] = toolbelt
+
+
+@operation(toolbelt=True)
+def relationship_operation(ctx, toolbelt, **_):
+    global_test_holder[ctx.name] = toolbelt
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 4d218b5..11c47d9 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -22,6 +22,7 @@ from ..storage import InMemoryModelDriver
 def simple():
     storage = application_model_storage(InMemoryModelDriver())
     storage.setup()
+    storage.blueprint.store(models.get_blueprint())
     storage.deployment.store(models.get_deployment())
     return context.workflow.WorkflowContext(
         name='simple_context',

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 295149e..327b0b9 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -26,11 +26,16 @@ EXECUTION_ID = 'test_execution_id'
 TASK_RETRY_INTERVAL = 1
 TASK_MAX_ATTEMPTS = 1
 
+DEPENDENCY_NODE_ID = 'dependency_node'
+DEPENDENCY_NODE_INSTANCE_ID = 'dependency_node_instance'
+DEPENDENT_NODE_ID = 'dependent_node'
+DEPENDENT_NODE_INSTANCE_ID = 'dependent_node_instance'
+
 
 def get_dependency_node():
     return models.Node(
-        id='dependency_node',
-        host_id='dependency_node',
+        id=DEPENDENCY_NODE_ID,
+        host_id=DEPENDENCY_NODE_ID,
         blueprint_id=BLUEPRINT_ID,
         type='test_node_type',
         type_hierarchy=[],
@@ -47,19 +52,20 @@ def get_dependency_node():
 
 def get_dependency_node_instance(dependency_node=None):
     return models.NodeInstance(
-        id='dependency_node_instance',
-        host_id='dependency_node_instance',
+        id=DEPENDENCY_NODE_INSTANCE_ID,
+        host_id=DEPENDENCY_NODE_INSTANCE_ID,
         deployment_id=DEPLOYMENT_ID,
-        runtime_properties={},
+        runtime_properties={'ip': '1.1.1.1'},
         version=None,
         relationship_instances=[],
         node=dependency_node or get_dependency_node()
     )
 
 
-def get_relationship(target=None):
+def get_relationship(source=None, target=None):
     return models.Relationship(
-        target_id=target.id or get_dependency_node().id,
+        source_id=source.id if source is not None else DEPENDENT_NODE_ID,
+        target_id=target.id if target is not None else DEPENDENCY_NODE_ID,
         source_interfaces={},
         source_operations=dict((key, {}) for key in operations.RELATIONSHIP_OPERATIONS),
         target_interfaces={},
@@ -70,10 +76,12 @@ def get_relationship(target=None):
     )
 
 
-def get_relationship_instance(target_instance=None, relationship=None):
+def get_relationship_instance(source_instance=None, target_instance=None, relationship=None):
     return models.RelationshipInstance(
-        target_id=target_instance.id or get_dependency_node_instance().id,
+        target_id=target_instance.id if target_instance else DEPENDENCY_NODE_INSTANCE_ID,
         target_name='test_target_name',
+        source_id=source_instance.id if source_instance else DEPENDENT_NODE_INSTANCE_ID,
+        source_name='test_source_name',
         type='some_type',
         relationship=relationship or get_relationship(target_instance.node
                                                       if target_instance else None)
@@ -82,8 +90,8 @@ def get_relationship_instance(target_instance=None, relationship=None):
 
 def get_dependent_node(relationship=None):
     return models.Node(
-        id='dependent_node',
-        host_id='dependent_node',
+        id=DEPENDENT_NODE_ID,
+        host_id=DEPENDENT_NODE_ID,
         blueprint_id=BLUEPRINT_ID,
         type='test_node_type',
         type_hierarchy=[],
@@ -98,10 +106,10 @@ def get_dependent_node(relationship=None):
     )
 
 
-def get_dependent_node_instance(relationship_instance, dependent_node=None):
+def get_dependent_node_instance(relationship_instance=None, dependent_node=None):
     return models.NodeInstance(
-        id='dependent_node_instance',
-        host_id='dependent_node_instance',
+        id=DEPENDENT_NODE_INSTANCE_ID,
+        host_id=DEPENDENT_NODE_INSTANCE_ID,
         deployment_id=DEPLOYMENT_ID,
         runtime_properties={},
         version=None,
@@ -110,6 +118,18 @@ def get_dependent_node_instance(relationship_instance, dependent_node=None):
     )
 
 
+def get_blueprint():
+    now = datetime.now()
+    return models.Blueprint(
+        plan={},
+        id=BLUEPRINT_ID,
+        description=None,
+        created_at=now,
+        updated_at=now,
+        main_file_name='main_file_name'
+    )
+
+
 def get_execution():
     return models.Execution(
         id=EXECUTION_ID,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/storage/test_models.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_models.py b/tests/storage/test_models.py
index 6a879fe..f3315f6 100644
--- a/tests/storage/test_models.py
+++ b/tests/storage/test_models.py
@@ -195,6 +195,7 @@ def _relationship(id=''):
     return Relationship(
         id='rel{0}'.format(id),
         target_id='target{0}'.format(id),
+        source_id='source{0}'.format(id),
         source_interfaces={},
         source_operations={},
         target_interfaces={},
@@ -248,6 +249,8 @@ def test_relationship_instance():
     relationship_instances = [RelationshipInstance(
         id='rel{0}'.format(index),
         target_id='target_{0}'.format(index % 2),
+        source_id='source_{0}'.format(index % 2),
+        source_name='',
         target_name='',
         relationship=relationship,
         type='type{0}'.format(index)) for index in xrange(3)]
@@ -350,7 +353,7 @@ def test_task_max_attempts_validation():
              name='name',
              operation_details={},
              inputs={},
-             node_instance=models.get_dependency_node_instance(),
+             actor=models.get_dependency_node_instance(),
              max_attempts=max_attempts)
     create_task(max_attempts=1)
     create_task(max_attempts=2)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py
index 857f3bf..f198b30 100644
--- a/tests/workflows/api/test_task.py
+++ b/tests/workflows/api/test_task.py
@@ -71,19 +71,19 @@ class TestOperationTask(object):
         retry_interval = 10
 
         with context.workflow.current.push(workflow_context):
-            model_task = api.task.OperationTask(name=name,
-                                                operation_details=op_details,
-                                                node_instance=node_instance,
-                                                inputs=inputs,
-                                                max_attempts=max_attempts,
-                                                retry_interval=retry_interval)
-
-        assert model_task.name == name
-        assert model_task.operation_details == op_details
-        assert model_task.node_instance == node_instance
-        assert model_task.inputs == inputs
-        assert model_task.retry_interval == retry_interval
-        assert model_task.max_attempts == max_attempts
+            api_task = api.task.OperationTask(name=name,
+                                              operation_details=op_details,
+                                              actor=node_instance,
+                                              inputs=inputs,
+                                              max_attempts=max_attempts,
+                                              retry_interval=retry_interval)
+
+        assert api_task.name == name
+        assert api_task.operation_details == op_details
+        assert api_task.actor == node_instance
+        assert api_task.inputs == inputs
+        assert api_task.retry_interval == retry_interval
+        assert api_task.max_attempts == max_attempts
 
     def test_operation_task_default_values(self):
         workflow_context = mock.context.simple()
@@ -91,7 +91,7 @@ class TestOperationTask(object):
             model_task = api.task.OperationTask(
                 name='stub',
                 operation_details={},
-                node_instance=mock.models.get_dependency_node_instance())
+                actor=mock.models.get_dependency_node_instance())
 
         assert model_task.inputs == {}
         assert model_task.retry_interval == workflow_context.task_retry_interval

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py
index 744e155..d878ad3 100644
--- a/tests/workflows/core/test_engine.py
+++ b/tests/workflows/core/test_engine.py
@@ -20,18 +20,24 @@ from datetime import datetime
 import pytest
 
 import aria
-from aria import events
-from aria import workflow
-from aria import context
+from aria import (
+    events,
+    workflow,
+    operation,
+    context
+)
 from aria.storage import models
-from aria.workflows import exceptions
-from aria.workflows.executor import thread
+from aria.workflows import (
+    api,
+    exceptions,
+)
 from aria.workflows.core import engine
-from aria.workflows import api
+from aria.workflows.executor import thread
 
-from tests import mock
 
 import tests.storage
+from tests import mock
+
 
 global_test_holder = {}
 
@@ -59,7 +65,7 @@ class BaseTest(object):
             name='task',
             operation_details={'operation': 'tests.workflows.core.test_engine.{name}'.format(
                 name=func.__name__)},
-            node_instance=ctx.model.node_instance.get('dependency_node_instance'),
+            actor=ctx.model.node_instance.get('dependency_node_instance'),
             inputs=inputs,
             max_attempts=max_attempts,
             retry_interval=retry_interval
@@ -107,7 +113,9 @@ class BaseTest(object):
 
     @pytest.fixture(scope='function')
     def executor(self):
-        result = thread.ThreadExecutor()
+        from aria.workflows.executor.blocking import CurrentThreadBlockingExecutor
+        result = CurrentThreadBlockingExecutor()
+        # result = thread.ThreadExecutor()
         try:
             yield result
         finally:
@@ -117,13 +125,9 @@ class BaseTest(object):
     def workflow_context(self):
         model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver())
         model_storage.setup()
-        deployment = models.Deployment(
-            id='d1',
-            blueprint_id='b1',
-            description=None,
-            created_at=datetime.utcnow(),
-            updated_at=datetime.utcnow(),
-            workflows={})
+        blueprint = mock.models.get_blueprint()
+        deployment = mock.models.get_deployment()
+        model_storage.blueprint.store(blueprint)
         model_storage.deployment.store(deployment)
         node = mock.models.get_dependency_node()
         node_instance = mock.models.get_dependency_node_instance(node)
@@ -373,20 +377,24 @@ class TestRetries(BaseTest):
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
 
-def mock_success_task():
+@operation
+def mock_success_task(**_):
     pass
 
 
-def mock_failed_task():
+@operation
+def mock_failed_task(**_):
     raise RuntimeError
 
 
-def mock_ordered_task(counter):
+@operation
+def mock_ordered_task(counter, **_):
     invocations = global_test_holder.setdefault('invocations', [])
     invocations.append(counter)
 
 
-def mock_conditional_failure_task(failure_count):
+@operation
+def mock_conditional_failure_task(failure_count, **_):
     invocations = global_test_holder.setdefault('invocations', [])
     try:
         if len(invocations) < failure_count:
@@ -395,7 +403,7 @@ def mock_conditional_failure_task(failure_count):
         invocations.append(time.time())
 
 
-def mock_sleep_task(seconds):
+def mock_sleep_task(seconds, **_):
     invocations = global_test_holder.setdefault('invocations', [])
     invocations.append(time.time())
     time.sleep(seconds)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_task.py b/tests/workflows/core/test_task.py
new file mode 100644
index 0000000..e268e44
--- /dev/null
+++ b/tests/workflows/core/test_task.py
@@ -0,0 +1,114 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from datetime import (
+    datetime,
+    timedelta
+)
+
+import pytest
+
+from aria.context import workflow as workflow_context
+from aria.workflows import (
+    api,
+    core,
+    exceptions,
+)
+
+from ... import mock
+
+
+@pytest.fixture
+def ctx():
+    simple_context = mock.context.simple()
+
+    blueprint = mock.models.get_blueprint()
+    deployment = mock.models.get_deployment()
+    node = mock.models.get_dependency_node()
+    node_instance = mock.models.get_dependency_node_instance(node)
+    execution = mock.models.get_execution()
+
+    simple_context.model.blueprint.store(blueprint)
+    simple_context.model.deployment.store(deployment)
+    simple_context.model.node.store(node)
+    simple_context.model.node_instance.store(node_instance)
+    simple_context.model.execution.store(execution)
+
+    return simple_context
+
+
+class TestOperationTask(object):
+
+    def _create_operation_task(self, ctx, node_instance):
+        with workflow_context.current.push(ctx):
+            api_task = api.task.OperationTask(
+                name='ripe',
+                operation_details={'operations': 'aria.tests.workflows.core.test_task.foo'},
+                actor=node_instance,
+            )
+
+            core_task = core.task.OperationTask(api_task=api_task)
+
+        return api_task, core_task
+
+    def test_operation_task_creation(self, ctx):
+        node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+        api_task, core_task = self._create_operation_task(ctx, node_instance)
+        storage_task = ctx.model.task.get(core_task.id)
+
+        assert core_task.model_context == storage_task
+        assert core_task.name == api_task.name
+        assert core_task.operation_details == api_task.operation_details
+        assert core_task.actor == api_task.actor == node_instance
+        assert core_task.inputs == api_task.inputs == storage_task.inputs
+
+    def test_operation_task_edit_locked_attribute(self, ctx):
+        node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+
+        _, core_task = self._create_operation_task(ctx, node_instance)
+        now = datetime.utcnow()
+        with pytest.raises(exceptions.TaskException):
+            core_task.status = core_task.STARTED
+        with pytest.raises(exceptions.TaskException):
+            core_task.started_at = now
+        with pytest.raises(exceptions.TaskException):
+            core_task.ended_at = now
+        with pytest.raises(exceptions.TaskException):
+            core_task.retry_count = 2
+        with pytest.raises(exceptions.TaskException):
+            core_task.due_at = now
+
+    def test_operation_task_edit_attributes(self, ctx):
+        node_instance = ctx.model.node_instance.get(mock.models.DEPENDENCY_NODE_INSTANCE_ID)
+
+        _, core_task = self._create_operation_task(ctx, node_instance)
+        future_time = datetime.utcnow() + timedelta(seconds=3)
+
+        with core_task.update():
+            core_task.status = core_task.STARTED
+            core_task.started_at = future_time
+            core_task.ended_at = future_time
+            core_task.retry_count = 2
+            core_task.eta = future_time
+            assert core_task.status != core_task.STARTED
+            assert core_task.started_at != future_time
+            assert core_task.ended_at != future_time
+            assert core_task.retry_count != 2
+            assert core_task.due_at != future_time
+
+        assert core_task.status == core_task.STARTED
+        assert core_task.started_at == future_time
+        assert core_task.ended_at == future_time
+        assert core_task.retry_count == 2
+        assert core_task.eta == future_time

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/workflows/core/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py
index 75e825f..fda36cd 100644
--- a/tests/workflows/core/test_task_graph_into_exececution_graph.py
+++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py
@@ -25,8 +25,12 @@ def test_task_graph_into_execution_graph():
     task_context = mock.context.simple()
     node = mock.models.get_dependency_node()
     node_instance = mock.models.get_dependency_node_instance()
+    deployment = mock.models.get_deployment()
+    execution = mock.models.get_execution()
     task_context.model.node.store(node)
     task_context.model.node_instance.store(node_instance)
+    task_context.model.deployment.store(deployment)
+    task_context.model.execution.store(execution)
 
     def sub_workflow(name, **_):
         return api.task_graph.TaskGraph(name)
@@ -89,7 +93,7 @@ def _assert_execution_is_api_task(execution_task, api_task):
     assert execution_task.id == api_task.id
     assert execution_task.name == api_task.name
     assert execution_task.operation_details == api_task.operation_details
-    assert execution_task.node_instance == api_task.node_instance
+    assert execution_task.actor == api_task.actor
     assert execution_task.inputs == api_task.inputs
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1cea7ef8/tests/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/executor/test_executor.py b/tests/workflows/executor/test_executor.py
index 0faa753..7b1052b 100644
--- a/tests/workflows/executor/test_executor.py
+++ b/tests/workflows/executor/test_executor.py
@@ -81,15 +81,15 @@ class TestExecutor(object):
             self.executor.close()
 
 
-def mock_successful_task():
+def mock_successful_task(**_):
     pass
 
 
-def mock_failing_task():
+def mock_failing_task(**_):
     raise MockException
 
 
-def mock_task_with_input(input):
+def mock_task_with_input(input, **_):
     raise MockException(input)
 
 if app:
@@ -106,7 +106,7 @@ class MockTask(object):
 
     INFINITE_RETRIES = models.Task.INFINITE_RETRIES
 
-    def __init__(self, func, inputs=None):
+    def __init__(self, func, inputs=None, ctx=None):
         self.states = []
         self.exception = None
         self.id = str(uuid.uuid4())
@@ -116,6 +116,7 @@ class MockTask(object):
         self.logger = logging.getLogger()
         self.name = name
         self.inputs = inputs or {}
+        self.context = ctx or None
         self.retry_count = 0
         self.max_attempts = 1
 


Mime
View raw message