ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: cleaning up
Date Sat, 05 Nov 2016 12:11:29 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-3-api-for-creating-workflows 99b1600ca -> 01f7e7bef


cleaning up


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/01f7e7be
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/01f7e7be
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/01f7e7be

Branch: refs/heads/ARIA-3-api-for-creating-workflows
Commit: 01f7e7bef5ed226a8a9279424ce1fe44bc130732
Parents: 99b1600
Author: mxmrlv <mxmrlv@gmail.com>
Authored: Sat Nov 5 11:39:37 2016 +0200
Committer: mxmrlv <mxmrlv@gmail.com>
Committed: Sat Nov 5 14:11:14 2016 +0200

----------------------------------------------------------------------
 aria/cli/commands.py                            |   2 +-
 aria/context/__init__.py                        |   6 +-
 aria/context/operation.py                       |  45 +++++++-
 aria/context/workflow.py                        |  39 +++----
 aria/decorators.py                              |  10 +-
 aria/workflows/api/__init__.py                  |   2 +
 aria/workflows/api/task.py                      |  53 +++++++---
 aria/workflows/api/task_graph.py                |  24 +++--
 aria/workflows/builtin/execute_operation.py     |  11 +-
 aria/workflows/builtin/heal.py                  |  58 ++++++----
 aria/workflows/core/__init__.py                 |   2 +
 aria/workflows/core/task.py                     | 106 +++++++++++++++++++
 aria/workflows/core/tasks.py                    | 104 ------------------
 aria/workflows/core/translation.py              |  34 +++---
 tests/mock/__init__.py                          |  18 +++-
 tests/mock/context.py                           |  15 +++
 tests/mock/models.py                            |  16 ++-
 tests/mock/operations.py                        |  15 ++-
 tests/workflows/__init__.py                     |   2 +
 tests/workflows/api/test_task.py                |  24 ++---
 tests/workflows/api/test_task_graph.py          |   4 +-
 .../workflows/builtin/test_execute_operation.py |  25 +++--
 tests/workflows/builtin/test_heal.py            |   5 +-
 tests/workflows/builtin/test_install.py         |   3 +-
 tests/workflows/builtin/test_uninstall.py       |   3 +-
 .../test_task_graph_into_exececution_graph.py   |  54 +++++-----
 26 files changed, 418 insertions(+), 262 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index fe21f59..f909a1e 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -29,7 +29,7 @@ from aria import application_model_storage, application_resource_storage
 from aria.logger import LoggerMixin
 from aria.storage import FileSystemModelDriver, FileSystemResourceDriver
 from aria.tools.application import StorageManager
-from aria.context import WorkflowContext
+from aria.context.workflow import WorkflowContext
 from aria.workflows.core.engine import Engine
 from aria.workflows.core.executor import ThreadExecutor
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/context/__init__.py
----------------------------------------------------------------------
diff --git a/aria/context/__init__.py b/aria/context/__init__.py
index 494bea5..20e19db 100644
--- a/aria/context/__init__.py
+++ b/aria/context/__init__.py
@@ -13,4 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from . import workflow
+"""
+Provides contexts to workflow and operation
+"""
+
+from . import workflow, operation

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/context/operation.py b/aria/context/operation.py
index beb2a80..d4d229a 100644
--- a/aria/context/operation.py
+++ b/aria/context/operation.py
@@ -17,11 +17,52 @@
 Workflow and operation contexts
 """
 
+from uuid import uuid4
+
 from aria.logger import LoggerMixin
 
 class OperationContext(LoggerMixin):
     """
-    Context object used during workflow creation and execution
+    Context object used during operation creation and execution
     """
 
-    pass
\ No newline at end of file
+    def __init__(
+            self,
+            name,
+            operation_details,
+            workflow_context,
+            node_instance,
+            inputs=None):
+        super(OperationContext, self).__init__()
+        self.name = name
+        self.id = str(uuid4())
+        self.operation_details = operation_details
+        self.workflow_context = workflow_context
+        self.node_instance = node_instance
+        self.inputs = inputs or {}
+
+    def __repr__(self):
+        details = ', '.join(
+            '{0}={1}'.format(key, value)
+            for key, value in self.operation_details.items())
+        return '{name}({0})'.format(details, name=self.name)
+
+    def __getattr__(self, attr):
+        try:
+            return getattr(self.workflow_context, attr)
+        except AttributeError:
+            return super(OperationContext, self).__getattribute__(attr)
+
+    @property
+    def operation(self):
+        """
+        The model operation
+        """
+        return self.storage.operation.get(self.id)
+
+    @operation.setter
+    def operation(self, value):
+        """
+        Store the operation in the model storage
+        """
+        self.storage.operation.store(value)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/context/workflow.py b/aria/context/workflow.py
index c8dd27f..b5b4e4b 100644
--- a/aria/context/workflow.py
+++ b/aria/context/workflow.py
@@ -21,11 +21,8 @@ import threading
 from uuid import uuid4
 from contextlib import contextmanager
 
-from proxy_tools import proxy
-
 from ..logger import LoggerMixin
 from ..tools.lru_cache import lru_cache
-from ..workflows.api.task_graph import TaskGraph
 
 
 class WorkflowContext(LoggerMixin):
@@ -39,7 +36,8 @@ class WorkflowContext(LoggerMixin):
             resource_storage,
             deployment_id,
             workflow_id,
-            execution_id,
+            # TODO: this should be changed when we decide where the execution is created
+            execution_id=None,
             parameters=None,
             **kwargs):
         super(WorkflowContext, self).__init__(**kwargs)
@@ -59,13 +57,6 @@ class WorkflowContext(LoggerMixin):
                 name=self.__class__.__name__, self=self))
 
     @property
-    def task_graph(self):
-        """
-        The task graph class
-        """
-        return TaskGraph
-
-    @property
     def blueprint_id(self):
         """
         The blueprint id
@@ -150,23 +141,39 @@ class WorkflowContext(LoggerMixin):
         return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path)
 
 
-class CurrentContext(threading.local):
+class _CurrentContext(threading.local):
+    """
+    Provides thread-level context, which sugarcoats the task api.
+    """
 
     def _set(self, value):
-        self._workflow_context = value
+        setattr(self, '_workflow_context', value)
 
     def get(self):
+        """
+        Retrieves the current workflow context
+        :return: the workflow context
+        :rtype: WorkflowContext
+        """
         try:
             return getattr(self, '_workflow_context')
         except AttributeError:
             return None
 
     def clear(self):
+        """
+        Clears the current context
+        """
         if hasattr(self, '_workflow_context'):
             delattr(self, '_workflow_context')
 
     @contextmanager
     def push(self, workflow_context):
+        """
+        Switches the current context to the provides context
+        :param workflow_context: the context to switch to.
+        :yields: the current context
+        """
         if hasattr(self, '_workflow_context'):
             prev_workflow_context = getattr(self, '_workflow_context')
         else:
@@ -177,9 +184,5 @@ class CurrentContext(threading.local):
         finally:
             self._set(prev_workflow_context)
 
-    def get_workflow_context(self):
-        return self.workflow_context
-
-
-current = CurrentContext()
+current = _CurrentContext()
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/decorators.py
----------------------------------------------------------------------
diff --git a/aria/decorators.py b/aria/decorators.py
index a69beb7..5713246 100644
--- a/aria/decorators.py
+++ b/aria/decorators.py
@@ -44,7 +44,7 @@ def workflow(
         workflow_name = _generate_workflow_name(
             func_name=func.__name__,
             suffix_template=suffix_template,
-            context=ctx,
+            ctx=ctx,
             **custom_kwargs)
 
         custom_kwargs.setdefault('ctx', ctx)
@@ -75,15 +75,15 @@ def operation(
     return _wrapper
 
 
-def _generate_workflow_name(func_name, context, suffix_template, **custom_kwargs):
+def _generate_workflow_name(func_name, ctx, suffix_template, **custom_kwargs):
     return '{func_name}.{suffix}'.format(
         func_name=func_name,
-        suffix=suffix_template.format(context=context, **custom_kwargs) or str(uuid4()))
+        suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4()))
 
 
 def _create_func_kwargs(
         kwargs,
-        context,
+        ctx,
         workflow_name=None):
-    kwargs.setdefault('graph', context.task_graph(workflow_name))
+    kwargs.setdefault('graph', ctx.task_graph(workflow_name))
     return kwargs

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/workflows/api/__init__.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/__init__.py b/aria/workflows/api/__init__.py
index ae1e83e..b7f4b0c 100644
--- a/aria/workflows/api/__init__.py
+++ b/aria/workflows/api/__init__.py
@@ -12,3 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from . import task, task_graph

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py
index 66cf9f8..a24fdc4 100644
--- a/aria/workflows/api/task.py
+++ b/aria/workflows/api/task.py
@@ -13,48 +13,71 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+"""
+Provides the tasks to be enterd into the task graph
+"""
 from uuid import uuid4
 
 from aria import context
 
 
 class BaseTask(object):
+    """
+    Abstract task_graph task
+    """
     pass
 
 
 class OperationTask(BaseTask):
+    """
+    Represents an operation task in the task_graph
+    """
 
     def __init__(self,
                  name,
                  operation_details,
                  node_instance,
                  **operation_kwargs):
-        self._workflow_context = context.workflow.current.get()
-        self._task_model = self._workflow_context.model.task.model_cls(
-            name=name,
-            operation_details=operation_details,
-            node_instance=node_instance,
-            execution_id=self._workflow_context.execution_id,
-            **operation_kwargs
-        )
-        self._workflow_context.model.task.store(self._task_model)
+        """
+        Creates an operation task using the name, details, node instance and any additional kwargs.
+        :param name: the operation of the name.
+        :param operation_details: the details for the operation.
+        :param node_instance: the node instnace on which this operation is registered.
+        :param operation_kwargs: any additional kwargs for the operation.
+        """
+        self.id = uuid4()
+        self.workflow_context = context.workflow.current.get()
+        self.name = name
+        self.operation_details = operation_details
+        self.node_instance = node_instance
+        self.operation_kwargs = operation_kwargs
 
     def __getattr__(self, item):
-        return getattr(self._workflow_context.model.task.get(self._task_model.id), item)
+        try:
+            return self.__getattribute__(item)
+        except AttributeError:
+            return getattr(self._workflow_context.model.task.get(self._task_model.id), item)
 
 
 class WorkflowTask(BaseTask):
-
+    """
+    Represents an workflow task in the task_graph
+    """
     def __init__(self, workflow_func, **kwargs):
+        """
+        Creates a workflow task using the workflow_func provided and its kwargs
+        :param workflow_func: the function to run
+        :param kwargs: the kwargs that would be passed to the workflow_func
+        """
         kwargs.setdefault('ctx', context.workflow.current.get())
         self._graph = workflow_func(**kwargs)
 
     @property
-    def id(self):
-        return self._id
-
-    @property
     def graph(self):
+        """
+        The graph constructed by the sub workflow
+        :return:
+        """
         return self._graph
 
     def __getattr__(self, item):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/workflows/api/task_graph.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/task_graph.py b/aria/workflows/api/task_graph.py
index 3d75c10..42cf8b6 100644
--- a/aria/workflows/api/task_graph.py
+++ b/aria/workflows/api/task_graph.py
@@ -22,6 +22,8 @@ from collections import Iterable
 
 from networkx import DiGraph, topological_sort
 
+from . import task
+
 
 class TaskNotInGraphError(Exception):
     """
@@ -130,14 +132,15 @@ class TaskGraph(object):
         :return: A list of added tasks
         :rtype: list
         """
+        # assert all([isinstance(t, (task.BaseTask, Iterable)) for t in tasks])
         return_tasks = []
 
-        for task in tasks:
-            if isinstance(task, Iterable):
-                return_tasks += self.add_tasks(*task)
-            elif not self.has_tasks(task):
-                self._graph.add_node(task.id, task=task)
-                return_tasks.append(task)
+        for t in tasks:
+            if isinstance(t, Iterable):
+                return_tasks += self.add_tasks(*t)
+            elif not self.has_tasks(t):
+                self._graph.add_node(t.id, task=t)
+                return_tasks.append(t)
 
         return return_tasks
 
@@ -168,13 +171,14 @@ class TaskGraph(object):
         :return: True if all tasks are in the graph, otherwise True
         :rtype: list
         """
+        # assert all(isinstance(t, (task.BaseTask, Iterable)) for t in tasks)
         return_value = True
 
-        for task in tasks:
-            if isinstance(task, Iterable):
-                return_value &= self.has_tasks(*task)
+        for t in tasks:
+            if isinstance(t, Iterable):
+                return_value &= self.has_tasks(*t)
             else:
-                return_value &= self._graph.has_node(task.id)
+                return_value &= self._graph.has_node(t.id)
 
         return return_value
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/workflows/builtin/execute_operation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/execute_operation.py b/aria/workflows/builtin/execute_operation.py
index dd43a29..ddbb8e7 100644
--- a/aria/workflows/builtin/execute_operation.py
+++ b/aria/workflows/builtin/execute_operation.py
@@ -67,11 +67,12 @@ def execute_operation(
 
     # registering actual tasks to sequences
     for node_instance in filtered_node_instances:
-        graph.add_tasks(execute_operation_on_instance(
-            node_instance=node_instance,
-            operation=operation,
-            operation_kwargs=operation_kwargs,
-            allow_kwargs_override=allow_kwargs_override
+        graph.add_tasks(
+            execute_operation_on_instance(
+                node_instance=node_instance,
+                operation=operation,
+                operation_kwargs=operation_kwargs,
+                allow_kwargs_override=allow_kwargs_override
             )
         )
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/workflows/builtin/heal.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/heal.py b/aria/workflows/builtin/heal.py
index 7a0c270..2f3d538 100644
--- a/aria/workflows/builtin/heal.py
+++ b/aria/workflows/builtin/heal.py
@@ -19,13 +19,8 @@ Builtin heal workflow
 
 from aria import workflow
 
-from .uninstall import uninstall
-from .install import install
-from .workflows import relationship_tasks
-from ..api import task
-
-
-# TODO: reimplement the heal, so that it doesn't use the install/uninstall workflows (since they are infact no subworkflows)
+from .workflows import relationship_tasks, install_node_instance, uninstall_node_instance
+from ..api import task, task_graph
 
 
 @workflow
@@ -33,7 +28,7 @@ def heal(ctx, graph, node_instance_id):
     """
     The heal workflow
 
-    :param WorkflowContext context: the workflow context
+    :param WorkflowContext ctx: the workflow context
     :param TaskGraph graph: the graph which will describe the workflow.
     :param node_instance_id: the id of the node instance to heal
     :return:
@@ -77,18 +72,27 @@ def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances):
 
     # Create install stub workflow for each unaffected node instance
     for node_instance in targeted_node_instances:
-        node_instance_sub_workflow = ctx.task_graph(
+        node_instance_sub_workflow = task_graph.TaskGraph(
             name='uninstall_stub_{0}'.format(node_instance.id))
         node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow
         graph.add_tasks(node_instance_sub_workflow)
 
     # Create install sub workflow for each failing node on the same graph
-    graph = uninstall(
-        ctx=ctx,
-        graph=graph,
-        node_instances=failing_node_instances,
-        node_instance_sub_workflows=node_instance_sub_workflows
-    )
+    node_instance_sub_workflows = node_instance_sub_workflows or {}
+
+    # create install sub workflow for every node instance
+    for node_instance in failing_node_instances:
+        node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance,
+                                                       node_instance=node_instance)
+        node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow
+        graph.add_tasks(node_instance_sub_workflow)
+
+    # create dependencies between the node instance sub workflow
+    for node_instance in failing_node_instances:
+        node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
+        for relationship_instance in reversed(node_instance.relationship_instances):
+            graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id],
+                                 node_instance_sub_workflow)
 
     # Add operations for intact nodes depending on a node instance belonging to node_instances
     for node_instance in targeted_node_instances:
@@ -125,18 +129,28 @@ def heal_install(ctx, graph, failing_node_instances, targeted_node_instances):
 
     # Create install sub workflow for each unaffected
     for node_instance in targeted_node_instances:
-        node_instance_sub_workflow = ctx.task_graph(
+        node_instance_sub_workflow = task_graph.TaskGraph(
             name='install_stub_{0}'.format(node_instance.id))
         node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow
         graph.add_tasks(node_instance_sub_workflow)
 
     # create install sub workflow for each failing node on the same graph
-    graph = install(
-        ctx=ctx,
-        graph=graph,
-        node_instances=failing_node_instances,
-        node_instance_sub_workflows=node_instance_sub_workflows
-    )
+    node_instance_sub_workflows = node_instance_sub_workflows or {}
+
+    # create install sub workflow for every node instance
+    for node_instance in failing_node_instances:
+        node_instance_sub_workflow = task.WorkflowTask(install_node_instance,
+                                                       node_instance=node_instance)
+        node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow
+        graph.add_tasks(node_instance_sub_workflow)
+
+    # create dependencies between the node instance sub workflow
+    for node_instance in failing_node_instances:
+        node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
+        if node_instance.relationship_instances:
+            dependencies = [node_instance_sub_workflows[relationship_instance.target_id]
+                            for relationship_instance in node_instance.relationship_instances]
+            graph.add_dependency(node_instance_sub_workflow, dependencies)
 
     # Add operations for intact nodes depending on a node instance
     # belonging to node_instances

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/__init__.py b/aria/workflows/core/__init__.py
index ae1e83e..08ccff7 100644
--- a/aria/workflows/core/__init__.py
+++ b/aria/workflows/core/__init__.py
@@ -12,3 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from . import translation, task

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py
new file mode 100644
index 0000000..de8a561
--- /dev/null
+++ b/aria/workflows/core/task.py
@@ -0,0 +1,106 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow tasks
+"""
+
+
+class BaseTask(object):
+    """
+    Base class for Task objects
+    """
+
+    def __init__(self, id):
+        self._id = id
+
+    @property
+    def id(self):
+        """
+        :return: the task's id
+        """
+        return self._id
+
+
+class StartWorkflowTask(BaseTask):
+    """
+    Tasks marking a workflow start
+    """
+    pass
+
+
+class EndWorkflowTask(BaseTask):
+    """
+    Tasks marking a workflow end
+    """
+    pass
+
+
+class StartSubWorkflowTask(BaseTask):
+    """
+    Tasks marking a subworkflow start
+    """
+    pass
+
+
+class EndSubWorkflowTask(BaseTask):
+    """
+    Tasks marking a subworkflow end
+    """
+    pass
+
+
+class OperationTask(BaseTask):
+    """
+    Operation tasks
+    """
+
+    def __init__(self, api_task, *args, **kwargs):
+        self._api_task = api_task
+        self._ctx = api_task.workflow_context
+        super(OperationTask, self).__init__(id=api_task.id)
+        self._create_operation_in_storage()
+
+    @property
+    def ctx(self):
+        """
+        :return: the task's name
+        """
+        return self._ctx
+
+    @property
+    def api_task(self):
+        """
+        :return: the task's context
+        """
+        return self._api_task
+
+    def _create_operation_in_storage(self):
+        task = self.ctx.model.task.model_cls(
+            name=self.api_task.name,
+            operation_details=self.api_task.operation_details,
+            node_instance=self.api_task.node_instance,
+            inputs=self.api_task.operation_kwargs,
+
+            execution_id=self.ctx.execution_id,
+            max_retries=self.ctx.parameters.get('max_retries', 1),
+        )
+        self.ctx.model.task.store(task)
+
+    def __getattr__(self, attr):
+        try:
+            return super(OperationTask, self).__getattribute__(attr)
+        except AttributeError:
+            return getattr(self.context, attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/workflows/core/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py
deleted file mode 100644
index 76ae609..0000000
--- a/aria/workflows/core/tasks.py
+++ /dev/null
@@ -1,104 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow tasks
-"""
-
-
-class BaseTask(object):
-    """
-    Base class for Task objects
-    """
-
-    def __init__(self, id, name, context):
-        self._id = id
-        self._name = name
-        self._context = context
-
-    @property
-    def id(self):
-        """
-        :return: the task's id
-        """
-        return self._id
-
-    @property
-    def name(self):
-        """
-        :return: the task's name
-        """
-        return self._name
-
-    @property
-    def context(self):
-        """
-        :return: the task's context
-        """
-        return self._context
-
-
-class StartWorkflowTask(BaseTask):
-    """
-    Tasks marking a workflow start
-    """
-    pass
-
-
-class EndWorkflowTask(BaseTask):
-    """
-    Tasks marking a workflow end
-    """
-    pass
-
-
-class StartSubWorkflowTask(BaseTask):
-    """
-    Tasks marking a subworkflow start
-    """
-    pass
-
-
-class EndSubWorkflowTask(BaseTask):
-    """
-    Tasks marking a subworkflow end
-    """
-    pass
-
-
-class OperationTask(BaseTask):
-    """
-    Operation tasks
-    """
-
-    def __init__(self, *args, **kwargs):
-        super(OperationTask, self).__init__(*args, **kwargs)
-        self._create_operation_in_storage()
-
-    def _create_operation_in_storage(self):
-        operation_cls = self.context.storage.operation.model_cls
-        operation = operation_cls(
-            id=self.context.id,
-            execution_id=self.context.execution_id,
-            max_retries=self.context.parameters.get('max_retries', 1),
-            status=operation_cls.PENDING,
-        )
-        self.context.operation = operation
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.context, attr)
-        except AttributeError:
-            return super(OperationTask, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/aria/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/translation.py b/aria/workflows/core/translation.py
index 29d5afa..f74d802 100644
--- a/aria/workflows/core/translation.py
+++ b/aria/workflows/core/translation.py
@@ -19,15 +19,15 @@ Translation of user graph's API to the execution graph
 
 from aria import context
 
-from . import tasks
+from .. import api
+from . import task
 
 
 def build_execution_graph(
         task_graph,
-        workflow_context,
         execution_graph,
-        start_cls=tasks.StartWorkflowTask,
-        end_cls=tasks.EndWorkflowTask,
+        start_cls=task.StartWorkflowTask,
+        end_cls=task.EndWorkflowTask,
         depends_on=()):
     """
     Translates the user graph to the execution graph
@@ -39,30 +39,27 @@ def build_execution_graph(
     :param depends_on: internal use
     """
     # Insert start marker
-    start_task = start_cls(id=_start_graph_suffix(task_graph.id),
-                           name=_start_graph_suffix(task_graph.name),
-                           context=workflow_context)
+    start_task = start_cls(id=_start_graph_suffix(task_graph.id))
     _add_task_and_dependencies(execution_graph, start_task, depends_on)
 
-    for task in task_graph.topological_order(reverse=True):
-        dependencies = task_graph.get_dependencies(task)
+    for api_task in task_graph.topological_order(reverse=True):
+        dependencies = task_graph.get_dependencies(api_task)
         operation_dependencies = _get_tasks_from_dependencies(
             execution_graph,
             dependencies,
             default=[start_task])
 
-        if _is_operation(task):
+        if _is_operation(api_task):
             # Add the task an the dependencies
-            operation_task = tasks.OperationTask(id=task.id, name=task.name, context=task)
+            operation_task = task.OperationTask(api_task)
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
         else:
             # Built the graph recursively while adding start and end markers
             build_execution_graph(
-                task_graph=task,
-                workflow_context=workflow_context,
+                task_graph=api_task,
                 execution_graph=execution_graph,
-                start_cls=tasks.StartSubWorkflowTask,
-                end_cls=tasks.EndSubWorkflowTask,
+                start_cls=task.StartSubWorkflowTask,
+                end_cls=task.EndSubWorkflowTask,
                 depends_on=operation_dependencies
             )
 
@@ -71,10 +68,7 @@ def build_execution_graph(
         execution_graph,
         _get_non_dependency_tasks(task_graph),
         default=[start_task])
-    end_task = end_cls(
-        id=_end_graph_suffix(task_graph.id),
-        name=_end_graph_suffix(task_graph.name),
-        context=workflow_context)
+    end_task = end_cls(id=_end_graph_suffix(task_graph.id))
     _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies)
 
 
@@ -94,7 +88,7 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
 
 
 def _is_operation(task):
-    return isinstance(task, context.OperationContext)
+    return isinstance(task, api.task.OperationTask)
 
 
 def _start_graph_suffix(id):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/mock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py
index b85014d..14541d0 100644
--- a/tests/mock/__init__.py
+++ b/tests/mock/__init__.py
@@ -1,4 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
-
-
-from . import models, context, operations
\ No newline at end of file
+from . import models, context, operations

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 9ca28cb..71df67c 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 from aria import context, application_model_storage
 
 from ..storage import InMemoryModelDriver

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index feb72d7..633adbb 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -1,3 +1,17 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 from datetime import datetime
 
@@ -115,4 +129,4 @@ def get_deployment():
         updated_at=now,
         blueprint_id=BLUEPRINT_ID,
         workflows={}
-    )
\ No newline at end of file
+    )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/mock/operations.py
----------------------------------------------------------------------
diff --git a/tests/mock/operations.py b/tests/mock/operations.py
index d45c2bb..407061f 100644
--- a/tests/mock/operations.py
+++ b/tests/mock/operations.py
@@ -1,4 +1,17 @@
-
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
 
 NODE_OPERATIONS_INSTALL = [
     'aria.interfaces.lifecycle.create',

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/workflows/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/__init__.py b/tests/workflows/__init__.py
index ae1e83e..fe04b2f 100644
--- a/tests/workflows/__init__.py
+++ b/tests/workflows/__init__.py
@@ -12,3 +12,5 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+from . import api, builtin, core

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/test_task.py b/tests/workflows/api/test_task.py
index 62d79b9..0edacf7 100644
--- a/tests/workflows/api/test_task.py
+++ b/tests/workflows/api/test_task.py
@@ -86,19 +86,17 @@ class TestOperationTask(object):
                                             max_retries=max_retries,
                                             retry_count=retry_count)
 
-            storage_task = workflow_context.model.task.get(model_task.id)
-            assert storage_task.name == model_task.name == name
-            assert storage_task.operation_details == model_task.operation_details == op_details
-            assert storage_task.node_instance == model_task.node_instance == node_instance
-            assert storage_task.inputs == model_task.inputs == inputs
-            assert storage_task.id == model_task.id == task_id
-            assert storage_task.status == model_task.status == status
-            assert storage_task.execution_id == model_task.execution_id == workflow_context.execution_id
-            assert storage_task.eta == model_task.eta == eta
-            assert storage_task.started_at == model_task.started_at == started_at
-            assert storage_task.ended_at == model_task.ended_at == ended_at
-            assert storage_task.max_retries == model_task.max_retries == max_retries
-            assert storage_task.retry_count == model_task.retry_count == retry_count
+        assert model_task.name == name
+        assert model_task.operation_details == op_details
+        assert model_task.node_instance == node_instance
+        assert model_task.operation_kwargs['inputs'] == inputs
+        assert model_task.operation_kwargs['id'] == task_id
+        assert model_task.operation_kwargs['status'] == status
+        assert model_task.operation_kwargs['eta'] == eta
+        assert model_task.operation_kwargs['started_at'] == started_at
+        assert model_task.operation_kwargs['ended_at'] == ended_at
+        assert model_task.operation_kwargs['max_retries'] == max_retries
+        assert model_task.operation_kwargs['retry_count'] == retry_count
 
 
 class TestWorkflowTask(object):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/workflows/api/test_task_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/api/test_task_graph.py b/tests/workflows/api/test_task_graph.py
index bd960ac..7d409cf 100644
--- a/tests/workflows/api/test_task_graph.py
+++ b/tests/workflows/api/test_task_graph.py
@@ -17,10 +17,10 @@ from uuid import uuid4
 
 import pytest
 
-from aria.workflows.api import task_graph
+from aria.workflows.api import task_graph, task
 
 
-class MockTask(object):
+class MockTask(task.BaseTask):
     def __init__(self):
         self.id = str(uuid4())
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/workflows/builtin/test_execute_operation.py
----------------------------------------------------------------------
diff --git a/tests/workflows/builtin/test_execute_operation.py b/tests/workflows/builtin/test_execute_operation.py
index f44b0e2..82010c8 100644
--- a/tests/workflows/builtin/test_execute_operation.py
+++ b/tests/workflows/builtin/test_execute_operation.py
@@ -15,6 +15,7 @@
 
 import pytest
 
+from aria.workflows.api import task
 from aria.workflows.builtin.execute_operation import execute_operation
 
 from ... import mock
@@ -52,16 +53,20 @@ def ctx():
 def test_execute_operation(ctx):
     operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
     node_instance_id = 'dependency_node_instance'
-    execute_tasks = list(execute_operation(
-        ctx=ctx,
-        operation=operation_name,
-        operation_kwargs={},
-        allow_kwargs_override=False,
-        run_by_dependency_order=False,
-        type_names=[],
-        node_ids=[],
-        node_instance_ids=[node_instance_id]
-    ).topological_order())
+
+    execute_tasks = list(
+        task.WorkflowTask(
+            execute_operation,
+            ctx=ctx,
+            operation=operation_name,
+            operation_kwargs={},
+            allow_kwargs_override=False,
+            run_by_dependency_order=False,
+            type_names=[],
+            node_ids=[],
+            node_instance_ids=[node_instance_id]
+        ).topological_order()
+    )
 
     assert len(execute_tasks) == 1
     assert execute_tasks[0].name == '{0}.{1}'.format(node_instance_id, operation_name)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/workflows/builtin/test_heal.py
----------------------------------------------------------------------
diff --git a/tests/workflows/builtin/test_heal.py b/tests/workflows/builtin/test_heal.py
index 28f0c09..4b9ae09 100644
--- a/tests/workflows/builtin/test_heal.py
+++ b/tests/workflows/builtin/test_heal.py
@@ -15,6 +15,7 @@
 
 import pytest
 
+from aria.workflows.api import task
 from aria.workflows.builtin.heal import heal
 
 from . import (assert_node_install_operations,
@@ -52,7 +53,7 @@ def ctx():
 
 
 def test_heal_dependent_node(ctx):
-    heal_graph = heal(ctx, node_instance_id='dependent_node_instance')
+    heal_graph = task.WorkflowTask(heal, ctx=ctx, node_instance_id='dependent_node_instance')
 
     assert len(list(heal_graph.tasks)) == 2
     uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True))
@@ -82,7 +83,7 @@ def test_heal_dependent_node(ctx):
 
 
 def test_heal_dependency_node(ctx):
-    heal_graph = heal(ctx, node_instance_id='dependency_node_instance')
+    heal_graph = task.WorkflowTask(heal, ctx=ctx, node_instance_id='dependency_node_instance')
     # both subgraphs should contain un\install for both the dependent and the dependency
     assert len(list(heal_graph.tasks)) == 2
     uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/workflows/builtin/test_install.py
----------------------------------------------------------------------
diff --git a/tests/workflows/builtin/test_install.py b/tests/workflows/builtin/test_install.py
index 31d0a4c..d27ece4 100644
--- a/tests/workflows/builtin/test_install.py
+++ b/tests/workflows/builtin/test_install.py
@@ -16,6 +16,7 @@
 import pytest
 
 from aria.workflows.builtin.install import install
+from aria.workflows.api import task
 
 from . import assert_node_install_operations
 from ... import mock
@@ -51,7 +52,7 @@ def ctx():
 
 
 def test_install(ctx):
-    install_tasks = list(install(ctx).topological_order(reverse=True))
+    install_tasks = list(task.WorkflowTask(install, ctx=ctx).topological_order(True))
 
     assert len(install_tasks) == 2
     dependency_node_subgraph, dependent_node_subgraph = install_tasks

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/workflows/builtin/test_uninstall.py
----------------------------------------------------------------------
diff --git a/tests/workflows/builtin/test_uninstall.py b/tests/workflows/builtin/test_uninstall.py
index 360a24a..cee7998 100644
--- a/tests/workflows/builtin/test_uninstall.py
+++ b/tests/workflows/builtin/test_uninstall.py
@@ -15,6 +15,7 @@
 
 import pytest
 
+from aria.workflows.api import task
 from aria.workflows.builtin.uninstall import uninstall
 
 from . import assert_node_uninstall_operations
@@ -51,7 +52,7 @@ def ctx():
 
 
 def test_uninstall(ctx):
-    uninstall_tasks = list(uninstall(ctx).topological_order(reverse=True))
+    uninstall_tasks = list(task.WorkflowTask(uninstall, ctx=ctx).topological_order(True))
 
     assert len(uninstall_tasks) == 2
     dependent_node_subgraph, dependency_node_subgraph = uninstall_tasks

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/01f7e7be/tests/workflows/core/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py
index 7c2ea1b..fe1624a 100644
--- a/tests/workflows/core/test_task_graph_into_exececution_graph.py
+++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py
@@ -13,28 +13,32 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import pytest
 from networkx import topological_sort, DiGraph
 
 from aria import context
-from aria.workflows.api import task_graph
-from aria.workflows.core import tasks, translation
+from aria.workflows import api, core
 
-
-@pytest.fixture(autouse=True)
-def no_storage(monkeypatch):
-    monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage',
-                        value=lambda *args, **kwargs: None)
+from ... import mock
 
 
 def test_task_graph_into_execution_graph():
-    test_task_graph = task_graph.TaskGraph('test_task_graph')
-    simple_before_task = context.OperationContext('test_simple_before_task', {}, {}, None)
-    simple_after_task = context.OperationContext('test_simple_after_task', {}, {}, None)
+    task_context = mock.context.simple()
+    node = mock.models.get_dependency_node()
+    node_instance = mock.models.get_dependency_node_instance()
+    task_context.model.node.store(node)
+    task_context.model.node_instance.store(node_instance)
+
+    def sub_workflow(name, **_):
+        return api.task_graph.TaskGraph(name)
+
+    with context.workflow.current.push(task_context):
+        test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
+        simple_before_task = api.task.OperationTask('test_simple_before_task', {}, node_instance)
+        simple_after_task = api.task.OperationTask('test_simple_after_task', {}, node_instance)
 
-    inner_task_graph = task_graph.TaskGraph('test_inner_task_graph')
-    inner_task = context.OperationContext('test_inner_task', {}, {}, None)
-    inner_task_graph.add_tasks(inner_task)
+        inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph')
+        inner_task = api.task.OperationTask('test_inner_task', {}, node_instance)
+        inner_task_graph.add_tasks(inner_task)
 
     test_task_graph.add_tasks(simple_before_task)
     test_task_graph.add_tasks(simple_after_task)
@@ -44,9 +48,8 @@ def test_task_graph_into_execution_graph():
 
     # Direct check
     execution_graph = DiGraph()
-    translation.build_execution_graph(task_graph=test_task_graph,
-                                      workflow_context=None,
-                                      execution_graph=execution_graph)
+    core.translation.build_execution_graph(task_graph=test_task_graph,
+                                           execution_graph=execution_graph)
     execution_tasks = topological_sort(execution_graph)
 
     assert len(execution_tasks) == 7
@@ -64,15 +67,16 @@ def test_task_graph_into_execution_graph():
     assert expected_tasks_names == execution_tasks
 
     assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
-                      tasks.StartWorkflowTask)
-    assert simple_before_task == _get_task_by_name(execution_tasks[1], execution_graph).context
+                      core.task.StartWorkflowTask)
+    assert simple_before_task == _get_task_by_name(execution_tasks[1], execution_graph).api_task
     assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
-                      tasks.StartSubWorkflowTask)
-    assert inner_task == _get_task_by_name(execution_tasks[3], execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), tasks.
-                      EndSubWorkflowTask)
-    assert simple_after_task == _get_task_by_name(execution_tasks[5], execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), tasks.EndWorkflowTask)
+                      core.task.StartSubWorkflowTask)
+    assert inner_task == _get_task_by_name(execution_tasks[3], execution_graph).api_task
+    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph),
+                      core.task.EndSubWorkflowTask)
+    assert simple_after_task == _get_task_by_name(execution_tasks[5], execution_graph).api_task
+    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph),
+                      core.task.EndWorkflowTask)
 
 
 def _get_task_by_name(task_name, graph):


Mime
View raw message