ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [7/8] incubator-ariatosca git commit: ARIA-3 Create an api for building workflows
Date Tue, 08 Nov 2016 10:16:36 GMT
ARIA-3 Create an api for building workflows

ARIA-4 Create an API for the task graph
ARIA-5 Adapt workflow API uses to modified API

An API for creating workflows. Users can build graphs of tasks and
set depenedencies in between tasks to execute them in a specific
order.

Additional changes:
- Remodeling for engine and user tasks.
- Remodeling for Operation into Task in the storage.
- Mminimal reorganization of a few test modules, so they are now
  using the same file system hierarchy as the modules which they test.


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8947f72c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8947f72c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8947f72c

Branch: refs/heads/ARIA-9-API-for-operation-context
Commit: 8947f72cb744a9b9fe79c80ebd2f2ef7fbb68251
Parents: 47eaf04
Author: Ran Ziv <ran@gigaspaces.com>
Authored: Fri Oct 21 13:21:25 2016 +0300
Committer: mxmrlv <mxmrlv@gmail.com>
Committed: Mon Nov 7 18:07:43 2016 +0200

----------------------------------------------------------------------
 .travis.yml                                     |   1 +
 aria/.pylintrc                                  |  17 +-
 aria/__init__.py                                |   2 +-
 aria/cli/commands.py                            |   2 +-
 aria/context/__init__.py                        |  20 +
 aria/context/operation.py                       |  68 ++
 aria/context/workflow.py                        | 190 +++++
 aria/contexts.py                                | 211 ------
 aria/decorators.py                              |  49 +-
 aria/events/builtin_event_handler.py            |  30 +-
 aria/events/workflow_engine_event_handler.py    |  20 +-
 aria/logger.py                                  |   6 +-
 aria/storage/models.py                          |  62 +-
 aria/tools/lru_cache.py                         |   6 +-
 aria/workflows/api/__init__.py                  |   6 +
 aria/workflows/api/task.py                      | 109 +++
 aria/workflows/api/task_graph.py                | 290 ++++++++
 aria/workflows/api/tasks_graph.py               | 203 -----
 aria/workflows/builtin/execute_operation.py     |  34 +-
 aria/workflows/builtin/heal.py                  | 163 ++--
 aria/workflows/builtin/install.py               |  23 +-
 aria/workflows/builtin/uninstall.py             |  17 +-
 aria/workflows/builtin/workflows.py             | 144 ++--
 aria/workflows/core/__init__.py                 |   6 +
 aria/workflows/core/engine.py                   |  19 +-
 aria/workflows/core/task.py                     | 200 +++++
 aria/workflows/core/tasks.py                    | 121 ---
 aria/workflows/core/translation.py              |  45 +-
 aria/workflows/exceptions.py                    |   7 +
 aria/workflows/executor/blocking.py             |   5 +-
 aria/workflows/executor/celery.py               |   5 +-
 aria/workflows/executor/multiprocess.py         |   7 +-
 aria/workflows/executor/thread.py               |   6 +-
 requirements.txt                                |   2 +
 setup.py                                        |   4 -
 tests/.pylintrc                                 |  21 +-
 tests/mock/__init__.py                          |  16 +
 tests/mock/context.py                           |  32 +
 tests/mock/models.py                            | 132 ++++
 tests/mock/operations.py                        |  33 +
 tests/test_logger.py                            |   2 +-
 tests/workflows/__init__.py                     |   2 +
 tests/workflows/api/__init__.py                 |  15 +
 tests/workflows/api/test_task.py                |  98 +++
 tests/workflows/api/test_task_graph.py          | 745 +++++++++++++++++++
 tests/workflows/builtin/__init__.py             |  86 +++
 .../workflows/builtin/test_execute_operation.py |  51 ++
 tests/workflows/builtin/test_heal.py            |  88 +++
 tests/workflows/builtin/test_install.py         |  39 +
 tests/workflows/builtin/test_uninstall.py       |  39 +
 tests/workflows/core/__init__.py                |  14 +
 tests/workflows/core/test_executor.py           | 136 ++++
 .../test_task_graph_into_exececution_graph.py   |  97 +++
 tests/workflows/test_engine.py                  |  41 +-
 tests/workflows/test_executor.py                | 136 ----
 .../test_task_graph_into_exececution_graph.py   |  79 --
 tox.ini                                         |   4 +-
 57 files changed, 2894 insertions(+), 1112 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/.travis.yml
----------------------------------------------------------------------
diff --git a/.travis.yml b/.travis.yml
index 8a54504..5413ff2 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -21,6 +21,7 @@ env:
 - TOX_ENV=py26
 install:
   - pip install --upgrade pip
+  - pip install --upgrade setuptools
   - pip install tox
 script:
   - pip --version

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/.pylintrc
----------------------------------------------------------------------
diff --git a/aria/.pylintrc b/aria/.pylintrc
index eb188a0..e5ee9de 100644
--- a/aria/.pylintrc
+++ b/aria/.pylintrc
@@ -1,3 +1,18 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
 [MASTER]
 
 # Python code to execute, usually for sys.path manipulation such as
@@ -62,7 +77,7 @@ confidence=
 # --enable=similarities". If you want to run only the classes checker, but have
 # no Warning level messages displayed, use"--disable=all --enable=classes
 # --disable=W"
-disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,logging-format-interpolation
+disable=import-star-module-level,old-octal-literal,oct-method,print-statement,unpacking-in-except,parameter-unpacking,backtick,old-raise-syntax,old-ne-operator,long-suffix,dict-view-method,dict-iter-method,metaclass-assignment,next-method-called,raising-string,indexing-exception,raw_input-builtin,long-builtin,file-builtin,execfile-builtin,coerce-builtin,cmp-builtin,buffer-builtin,basestring-builtin,apply-builtin,filter-builtin-not-iterating,using-cmp-argument,useless-suppression,range-builtin-not-iterating,suppressed-message,no-absolute-import,old-division,cmp-method,reload-builtin,zip-builtin-not-iterating,intern-builtin,unichr-builtin,reduce-builtin,standarderror-builtin,unicode-builtin,xrange-builtin,coerce-method,delslice-method,getslice-method,setslice-method,input-builtin,round-builtin,hex-method,nonzero-method,map-builtin-not-iterating,redefined-builtin,logging-format-interpolation,import-error
 
 [REPORTS]
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index 4d9b0f4..eca7b9b 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -56,7 +56,7 @@ def application_model_storage(driver):
                 models.DeploymentModification,
                 models.Execution,
                 models.ProviderContext,
-                models.Operation,
+                models.Task,
             ])
     return _model_storage[driver]
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index ddc27b5..d3698fd 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -29,7 +29,7 @@ from aria import application_model_storage, application_resource_storage
 from aria.logger import LoggerMixin
 from aria.storage import FileSystemModelDriver, FileSystemResourceDriver
 from aria.tools.application import StorageManager
-from aria.contexts import WorkflowContext
+from aria.context.workflow import WorkflowContext
 from aria.workflows.core.engine import Engine
 from aria.workflows.executor.thread import ThreadExecutor
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/context/__init__.py
----------------------------------------------------------------------
diff --git a/aria/context/__init__.py b/aria/context/__init__.py
new file mode 100644
index 0000000..20e19db
--- /dev/null
+++ b/aria/context/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Provides contexts to workflow and operation
+"""
+
+from . import workflow, operation

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/context/operation.py b/aria/context/operation.py
new file mode 100644
index 0000000..d4d229a
--- /dev/null
+++ b/aria/context/operation.py
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow and operation contexts
+"""
+
+from uuid import uuid4
+
+from aria.logger import LoggerMixin
+
+class OperationContext(LoggerMixin):
+    """
+    Context object used during operation creation and execution
+    """
+
+    def __init__(
+            self,
+            name,
+            operation_details,
+            workflow_context,
+            node_instance,
+            inputs=None):
+        super(OperationContext, self).__init__()
+        self.name = name
+        self.id = str(uuid4())
+        self.operation_details = operation_details
+        self.workflow_context = workflow_context
+        self.node_instance = node_instance
+        self.inputs = inputs or {}
+
+    def __repr__(self):
+        details = ', '.join(
+            '{0}={1}'.format(key, value)
+            for key, value in self.operation_details.items())
+        return '{name}({0})'.format(details, name=self.name)
+
+    def __getattr__(self, attr):
+        try:
+            return getattr(self.workflow_context, attr)
+        except AttributeError:
+            return super(OperationContext, self).__getattribute__(attr)
+
+    @property
+    def operation(self):
+        """
+        The model operation
+        """
+        return self.storage.operation.get(self.id)
+
+    @operation.setter
+    def operation(self, value):
+        """
+        Store the operation in the model storage
+        """
+        self.storage.operation.store(value)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/context/workflow.py b/aria/context/workflow.py
new file mode 100644
index 0000000..8183d42
--- /dev/null
+++ b/aria/context/workflow.py
@@ -0,0 +1,190 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow and operation contexts
+"""
+
+import threading
+from uuid import uuid4
+from contextlib import contextmanager
+
+from .. import logger
+from ..tools.lru_cache import lru_cache
+from .. import exceptions
+
+
+class ContextException(exceptions.AriaError):
+    """
+    Context based exception
+    """
+    pass
+
+
+class WorkflowContext(logger.LoggerMixin):
+    """
+    Context object used during workflow creation and execution
+    """
+
+    def __init__(
+            self,
+            name,
+            model_storage,
+            resource_storage,
+            deployment_id,
+            workflow_id,
+            execution_id=None,
+            parameters=None,
+            **kwargs):
+        super(WorkflowContext, self).__init__(**kwargs)
+        self.name = name
+        self.id = str(uuid4())
+        self.model = model_storage
+        self.resource = resource_storage
+        self.deployment_id = deployment_id
+        self.workflow_id = workflow_id
+        self.execution_id = execution_id or str(uuid4())
+        self.parameters = parameters or {}
+
+    def __repr__(self):
+        return (
+            '{name}(deployment_id={self.deployment_id}, '
+            'workflow_id={self.workflow_id}, '
+            'execution_id={self.execution_id})'.format(
+                name=self.__class__.__name__, self=self))
+
+    @property
+    def blueprint_id(self):
+        """
+        The blueprint id
+        """
+        return self.deployment.blueprint_id
+
+    @property
+    @lru_cache()
+    def blueprint(self):
+        """
+        The blueprint model
+        """
+        return self.model.blueprint.get(self.blueprint_id)
+
+    @property
+    @lru_cache()
+    def deployment(self):
+        """
+        The deployment model
+        """
+        return self.model.deployment.get(self.deployment_id)
+
+    @property
+    def nodes(self):
+        """
+        Iterator over nodes
+        """
+        return self.model.node.iter(
+            filters={'blueprint_id': self.blueprint_id})
+
+    @property
+    def node_instances(self):
+        """
+        Iterator over node instances
+        """
+        return self.model.node_instance.iter(filters={'deployment_id': self.deployment_id})
+
+    @property
+    def execution(self):
+        """
+        The execution model
+        """
+        return self.model.execution.get(self.execution_id)
+
+    @execution.setter
+    def execution(self, value):
+        """
+        Store the execution in the model storage
+        """
+        self.model.execution.store(value)
+
+    def download_blueprint_resource(self, destination, path=None):
+        """
+        Download a blueprint resource from the resource storage
+        """
+        return self.resource.blueprint.download(
+            entry_id=self.blueprint_id,
+            destination=destination,
+            path=path)
+
+    def download_deployment_resource(self, destination, path=None):
+        """
+        Download a deployment resource from the resource storage
+        """
+        return self.resource.deployment.download(
+            entry_id=self.deployment_id,
+            destination=destination,
+            path=path)
+
+    @lru_cache()
+    def get_deployment_resource_data(self, path=None):
+        """
+        Read a deployment resource as string from the resource storage
+        """
+        return self.resource.deployment.data(entry_id=self.deployment_id, path=path)
+
+    @lru_cache()
+    def get_blueprint_resource_data(self, path=None):
+        """
+        Read a blueprint resource as string from the resource storage
+        """
+        return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path)
+
+
+class _CurrentContext(threading.local):
+    """
+    Provides thread-level context, which sugarcoats the task api.
+    """
+
+    def __init__(self):
+        super(_CurrentContext, self).__init__()
+        self._workflow_context = None
+
+    def _set(self, value):
+        self._workflow_context = value
+
+    def get(self):
+        """
+        Retrieves the current workflow context
+        :return: the workflow context
+        :rtype: WorkflowContext
+        """
+        if self._workflow_context is not None:
+            return self._workflow_context
+        raise ContextException("No context was set")
+
+    @contextmanager
+    def push(self, workflow_context):
+        """
+        Switches the current context to the provided context
+        :param workflow_context: the context to switch to.
+        :yields: the current context
+        """
+        prev_workflow_context = self._workflow_context
+        self._set(workflow_context)
+        try:
+            yield self
+        finally:
+            self._set(prev_workflow_context)
+
+current = _CurrentContext()
+

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/contexts.py
----------------------------------------------------------------------
diff --git a/aria/contexts.py b/aria/contexts.py
deleted file mode 100644
index fdd26a2..0000000
--- a/aria/contexts.py
+++ /dev/null
@@ -1,211 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow and operation contexts
-"""
-
-from uuid import uuid4
-
-from aria.logger import LoggerMixin
-from aria.tools.lru_cache import lru_cache
-from aria.workflows.api.tasks_graph import TaskGraph
-
-
-class WorkflowContext(LoggerMixin):
-    """
-    Context object used during workflow creation and execution
-    """
-
-    def __init__(
-            self,
-            name,
-            model_storage,
-            resource_storage,
-            deployment_id,
-            workflow_id,
-            parameters=None,
-            **kwargs):
-        super(WorkflowContext, self).__init__(**kwargs)
-        self.name = name
-        self.id = str(uuid4())
-        self.model = model_storage
-        self.resource = resource_storage
-        self.deployment_id = deployment_id
-        self.workflow_id = workflow_id
-        self.execution_id = str(uuid4())
-        self.parameters = parameters or {}
-
-    def __repr__(self):
-        return (
-            '{name}(deployment_id={self.deployment_id}, '
-            'workflow_id={self.workflow_id}, '
-            'execution_id={self.execution_id})'.format(
-                name=self.__class__.__name__, self=self))
-
-    def operation(
-            self,
-            name,
-            operation_details,
-            node_instance,
-            inputs=None):
-        """
-        Called during workflow creation, return an operation context. This object should be added to
-        the task graph.
-        """
-        return OperationContext(
-            name=name,
-            operation_details=operation_details,
-            workflow_context=self,
-            node_instance=node_instance,
-            inputs=inputs or {})
-
-    @property
-    def task_graph(self):
-        """
-        The task graph class
-        """
-        return TaskGraph
-
-    @property
-    def blueprint_id(self):
-        """
-        The blueprint id
-        """
-        return self.deployment.blueprint_id
-
-    @property
-    @lru_cache()
-    def blueprint(self):
-        """
-        The blueprint model
-        """
-        return self.model.blueprint.get(self.blueprint_id)
-
-    @property
-    @lru_cache()
-    def deployment(self):
-        """
-        The deployment model
-        """
-        return self.model.deployment.get(self.deployment_id)
-
-    @property
-    def nodes(self):
-        """
-        Iterator over nodes
-        """
-        return self.model.node.iter(
-            filters={'blueprint_id': self.blueprint_id})
-
-    @property
-    def node_instances(self):
-        """
-        Iterator over node instances
-        """
-        return self.model.node_instance.iter(filters={'deployment_id': self.deployment_id})
-
-    @property
-    def execution(self):
-        """
-        The execution model
-        """
-        return self.model.execution.get(self.execution_id)
-
-    @execution.setter
-    def execution(self, value):
-        """
-        Store the execution in the model storage
-        """
-        self.model.execution.store(value)
-
-    def download_blueprint_resource(self, destination, path=None):
-        """
-        Download a blueprint resource from the resource storage
-        """
-        return self.resource.blueprint.download(
-            entry_id=self.blueprint_id,
-            destination=destination,
-            path=path)
-
-    def download_deployment_resource(self, destination, path=None):
-        """
-        Download a deployment resource from the resource storage
-        """
-        return self.resource.deployment.download(
-            entry_id=self.deployment_id,
-            destination=destination,
-            path=path)
-
-    @lru_cache()
-    def get_deployment_resource_data(self, path=None):
-        """
-        Read a deployment resource as string from the resource storage
-        """
-        return self.resource.deployment.data(entry_id=self.deployment_id, path=path)
-
-    @lru_cache()
-    def get_blueprint_resource_data(self, path=None):
-        """
-        Read a blueprint resource as string from the resource storage
-        """
-        return self.resource.blueprint.data(entry_id=self.blueprint_id, path=path)
-
-
-class OperationContext(LoggerMixin):
-    """
-    Context object used during operation creation and execution
-    """
-
-    def __init__(
-            self,
-            name,
-            operation_details,
-            workflow_context,
-            node_instance,
-            inputs=None):
-        super(OperationContext, self).__init__()
-        self.name = name
-        self.id = str(uuid4())
-        self.operation_details = operation_details
-        self.workflow_context = workflow_context
-        self.node_instance = node_instance
-        self.inputs = inputs or {}
-
-    def __repr__(self):
-        details = ', '.join(
-            '{0}={1}'.format(key, value)
-            for key, value in self.operation_details.items())
-        return '{name}({0})'.format(details, name=self.name)
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.workflow_context, attr)
-        except AttributeError:
-            return super(OperationContext, self).__getattribute__(attr)
-
-    @property
-    def operation(self):
-        """
-        The model operation
-        """
-        return self.model.operation.get(self.id)
-
-    @operation.setter
-    def operation(self, value):
-        """
-        Store the operation in the model storage
-        """
-        self.model.operation.store(value)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/decorators.py
----------------------------------------------------------------------
diff --git a/aria/decorators.py b/aria/decorators.py
index 7bc41b3..a07e2ee 100644
--- a/aria/decorators.py
+++ b/aria/decorators.py
@@ -20,12 +20,13 @@ Workflow and operation decorators
 from uuid import uuid4
 from functools import partial, wraps
 
-from aria.tools.validation import validate_function_arguments
+from . import context
+from .workflows.api import task_graph
+from .tools.validation import validate_function_arguments
 
 
 def workflow(
         func=None,
-        workflow_context=True,
         simple_workflow=True,
         suffix_template=''):
     """
@@ -34,31 +35,29 @@ def workflow(
     if func is None:
         return partial(
             workflow,
-            workflow_context=workflow_context,
             simple_workflow=simple_workflow,
             suffix_template=suffix_template)
 
     @wraps(func)
-    def _wrapper(context, **custom_kwargs):
+    def _wrapper(ctx, **workflow_parameters):
+
         workflow_name = _generate_workflow_name(
             func_name=func.__name__,
             suffix_template=suffix_template,
-            context=context,
-            **custom_kwargs)
-        func_kwargs = _create_func_kwargs(
-            custom_kwargs,
-            context,
-            add_context=workflow_context,
-            workflow_name=workflow_name)
-        validate_function_arguments(func, func_kwargs)
-        func(**func_kwargs)
-        return func_kwargs['graph']
+            ctx=ctx,
+            **workflow_parameters)
+
+        workflow_parameters.setdefault('ctx', ctx)
+        workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name))
+        validate_function_arguments(func, workflow_parameters)
+        with context.workflow.current.push(ctx):
+            func(**workflow_parameters)
+        return workflow_parameters['graph']
     return _wrapper
 
 
 def operation(
-        func=None,
-        operation_context=True):
+        func=None):
     """
     Operation decorator
     """
@@ -66,29 +65,25 @@ def operation(
         return partial(operation)
 
     @wraps(func)
-    def _wrapper(context, **custom_kwargs):
+    def _wrapper(ctx, **custom_kwargs):
         func_kwargs = _create_func_kwargs(
             custom_kwargs,
-            context,
-            add_context=operation_context)
+            ctx)
         validate_function_arguments(func, func_kwargs)
-        context.description = func.__doc__
+        ctx.description = func.__doc__
         return func(**func_kwargs)
     return _wrapper
 
 
-def _generate_workflow_name(func_name, context, suffix_template, **custom_kwargs):
+def _generate_workflow_name(func_name, ctx, suffix_template, **custom_kwargs):
     return '{func_name}.{suffix}'.format(
         func_name=func_name,
-        suffix=suffix_template.format(context=context, **custom_kwargs) or str(uuid4()))
+        suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4()))
 
 
 def _create_func_kwargs(
         kwargs,
-        context,
-        add_context=True,
+        ctx,
         workflow_name=None):
-    if add_context:
-        kwargs['context'] = context
-    kwargs.setdefault('graph', context.task_graph(workflow_name))
+    kwargs.setdefault('graph', ctx.task_graph(workflow_name))
     return kwargs

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py
index 2dfbd00..2abdd9f 100644
--- a/aria/events/builtin_event_handler.py
+++ b/aria/events/builtin_event_handler.py
@@ -36,37 +36,29 @@ from . import (
 
 @sent_task_signal.connect
 def _task_sent(task, *args, **kwargs):
-    operation_context = task.context
-    operation = operation_context.operation
-    operation.status = operation.SENT
-    operation_context.operation = operation
+    with task.update():
+        task.status = task.SENT
 
 
 @start_task_signal.connect
 def _task_started(task, *args, **kwargs):
-    operation_context = task.context
-    operation = operation_context.operation
-    operation.started_at = datetime.utcnow()
-    operation.status = operation.STARTED
-    operation_context.operation = operation
+    with task.update():
+        task.started_at = datetime.utcnow()
+        task.status = task.STARTED
 
 
 @on_failure_task_signal.connect
 def _task_failed(task, *args, **kwargs):
-    operation_context = task.context
-    operation = operation_context.operation
-    operation.ended_at = datetime.utcnow()
-    operation.status = operation.FAILED
-    operation_context.operation = operation
+    with task.update():
+        task.ended_at = datetime.utcnow()
+        task.status = task.FAILED
 
 
 @on_success_task_signal.connect
 def _task_succeeded(task, *args, **kwargs):
-    operation_context = task.context
-    operation = operation_context.operation
-    operation.ended_at = datetime.utcnow()
-    operation.status = operation.SUCCESS
-    operation_context.operation = operation
+    with task.update():
+        task.ended_at = datetime.utcnow()
+        task.status = task.SUCCESS
 
 
 @start_workflow_signal.connect

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/events/workflow_engine_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/workflow_engine_event_handler.py b/aria/events/workflow_engine_event_handler.py
index 60138e1..2f74ded 100644
--- a/aria/events/workflow_engine_event_handler.py
+++ b/aria/events/workflow_engine_event_handler.py
@@ -33,36 +33,30 @@ from . import (
 
 @start_task_signal.connect
 def _start_task_handler(task, **kwargs):
-    task.logger.debug(
-        'Event: Starting task: {task.name}'.format(task=task))
+    task.logger.debug('Event: Starting task: {task.name}'.format(task=task))
 
 
 @on_success_task_signal.connect
 def _success_task_handler(task, **kwargs):
-    task.logger.debug(
-        'Event: Task success: {task.name}'.format(task=task))
+    task.logger.debug('Event: Task success: {task.name}'.format(task=task))
 
 
 @on_failure_task_signal.connect
 def _failure_operation_handler(task, **kwargs):
-    task.logger.error(
-        'Event: Task failure: {task.name}'.format(task=task),
-        exc_info=kwargs.get('exception', True))
+    task.logger.error('Event: Task failure: {task.name}'.format(task=task),
+                      exc_info=kwargs.get('exception', True))
 
 
 @start_workflow_signal.connect
 def _start_workflow_handler(context, **kwargs):
-    context.logger.debug(
-        'Event: Starting workflow: {context.name}'.format(context=context))
+    context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context))
 
 
 @on_failure_workflow_signal.connect
 def _failure_workflow_handler(context, **kwargs):
-    context.logger.debug(
-        'Event: Workflow failure: {context.name}'.format(context=context))
+    context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context))
 
 
 @on_success_workflow_signal.connect
 def _success_workflow_handler(context, **kwargs):
-    context.logger.debug(
-        'Event: Workflow success: {context.name}'.format(context=context))
+    context.logger.debug('Event: Workflow success: {context.name}'.format(context=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 4959126..0002cb5 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -37,7 +37,7 @@ class LoggerMixin(object):
 
     def __init__(self, *args, **kwargs):
         self.logger_name = self.logger_name or self.__class__.__name__
-        self.logger = _base_logger.getChild(self.logger_name)
+        self.logger = logging.getLogger('{0}.{1}'.format(_base_logger.name, self.logger_name))
         self.logger.setLevel(self.logger_level)
         super(LoggerMixin, self).__init__(*args, **kwargs)
 
@@ -63,7 +63,7 @@ class LoggerMixin(object):
 
     def __setstate__(self, obj_dict):
         vars(self).update(
-            logger=_base_logger.getChild(obj_dict['logger_name']),
+            logger=logging.getLogger('{0}.{1}'.format(_base_logger.name, obj_dict['logger_name'])),
             **obj_dict)
 
 
@@ -112,7 +112,7 @@ class _DefaultConsoleFormat(logging.Formatter):
                 self._fmt = '%(levelname)s: %(message)s'
         except AttributeError:
             return record.message
-        return super(_DefaultConsoleFormat, self).format(record)
+        return logging.Formatter.format(self, record)
 
 
 def create_file_log_handler(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index e5fc1ac..9aa7cf0 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -216,34 +216,6 @@ class Execution(Model):
     is_system_workflow = Field(type=bool, default=False)
 
 
-class Operation(Model):
-    """
-    A Model which represents an operation
-    """
-    PENDING = 'pending'
-    SENT = 'sent'
-    STARTED = 'started'
-    SUCCESS = 'success'
-    FAILED = 'failed'
-    STATES = (
-        PENDING,
-        SENT,
-        STARTED,
-        SUCCESS,
-        FAILED,
-    )
-    END_STATES = [SUCCESS, FAILED]
-
-    id = Field(type=basestring, default=uuid_generator)
-    status = Field(type=basestring, choices=STATES, default=PENDING)
-    execution_id = Field(type=basestring)
-    eta = Field(type=datetime, default=datetime.now)
-    started_at = Field(type=datetime, default=None)
-    ended_at = Field(type=datetime, default=None)
-    max_retries = Field(type=int, default=0)
-    retry_count = Field(type=int, default=0)
-
-
 class Relationship(Model):
     """
     A Model which represents a relationship
@@ -397,3 +369,37 @@ class Plugin(Model):
     excluded_wheels = Field()
     supported_py_versions = Field(type=list)
     uploaded_at = Field(type=datetime)
+
+
+class Task(Model):
+    """
+    A Model which represents an task
+    """
+    PENDING = 'pending'
+    SENT = 'sent'
+    STARTED = 'started'
+    SUCCESS = 'success'
+    FAILED = 'failed'
+    STATES = (
+        PENDING,
+        SENT,
+        STARTED,
+        SUCCESS,
+        FAILED,
+    )
+    END_STATES = [SUCCESS, FAILED]
+
+    id = Field(type=basestring, default=uuid_generator)
+    status = Field(type=basestring, choices=STATES, default=PENDING)
+    execution_id = Field(type=basestring)
+    eta = Field(type=datetime, default=datetime.now)
+    started_at = Field(type=datetime, default=None)
+    ended_at = Field(type=datetime, default=None)
+    max_retries = Field(type=int, default=1)
+    retry_count = Field(type=int, default=0)
+
+    # Operation specific fields
+    name = Field(type=basestring)
+    operation_details = Field(type=dict)
+    node_instance = PointerField(type=NodeInstance)
+    inputs = Field(type=dict, default=lambda: {})

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/tools/lru_cache.py
----------------------------------------------------------------------
diff --git a/aria/tools/lru_cache.py b/aria/tools/lru_cache.py
index 5863376..bb39b90 100755
--- a/aria/tools/lru_cache.py
+++ b/aria/tools/lru_cache.py
@@ -21,7 +21,11 @@ Function lru_cache implementation for python 2.7
 from time import time
 from functools import partial, wraps
 from itertools import imap
-from collections import OrderedDict
+
+try:
+    from collections import OrderedDict
+except ImportError:
+    from ordereddict import OrderedDict
 
 
 class _LRUCache(object):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/api/__init__.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/__init__.py b/aria/workflows/api/__init__.py
index ae1e83e..a3a17ee 100644
--- a/aria/workflows/api/__init__.py
+++ b/aria/workflows/api/__init__.py
@@ -12,3 +12,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+"""
+Provides API for building tasks
+"""
+
+from . import task, task_graph

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/task.py b/aria/workflows/api/task.py
new file mode 100644
index 0000000..1070d99
--- /dev/null
+++ b/aria/workflows/api/task.py
@@ -0,0 +1,109 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Provides the tasks to be enterd into the task graph
+"""
+from uuid import uuid4
+
+from ... import context
+
+
+class BaseTask(object):
+    """
+    Abstract task_graph task
+    """
+    def __init__(self, ctx=None, **kwargs):
+        if ctx is not None:
+            self._workflow_context = ctx
+        else:
+            self._workflow_context = context.workflow.current.get()
+        self._id = str(uuid4())
+
+    @property
+    def id(self):
+        """
+        uuid4 generated id
+        :return:
+        """
+        return self._id
+
+    @property
+    def workflow_context(self):
+        """
+        the context of the current workflow
+        :return:
+        """
+        return self._workflow_context
+
+
+class OperationTask(BaseTask):
+    """
+    Represents an operation task in the task_graph
+    """
+
+    def __init__(self,
+                 name,
+                 operation_details,
+                 node_instance,
+                 inputs=None):
+        """
+        Creates an operation task using the name, details, node instance and any additional kwargs.
+        :param name: the operation of the name.
+        :param operation_details: the details for the operation.
+        :param node_instance: the node instance on which this operation is registered.
+        :param inputs: operation inputs.
+        """
+        super(OperationTask, self).__init__()
+        self.name = name
+        self.operation_details = operation_details
+        self.node_instance = node_instance
+        self.inputs = inputs or {}
+
+
+class WorkflowTask(BaseTask):
+    """
+    Represents an workflow task in the task_graph
+    """
+    def __init__(self, workflow_func, **kwargs):
+        """
+        Creates a workflow based task using the workflow_func provided, and its kwargs
+        :param workflow_func: the function to run
+        :param kwargs: the kwargs that would be passed to the workflow_func
+        """
+        super(WorkflowTask, self).__init__(**kwargs)
+        kwargs['ctx'] = self.workflow_context
+        self._graph = workflow_func(**kwargs)
+
+    @property
+    def graph(self):
+        """
+        The graph constructed by the sub workflow
+        :return:
+        """
+        return self._graph
+
+    def __getattr__(self, item):
+        try:
+            return getattr(self._graph, item)
+        except AttributeError:
+            return super(WorkflowTask, self).__getattribute__(item)
+
+
+class StubTask(BaseTask):
+    """
+    Enables creating empty tasks.
+    """
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/api/task_graph.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/task_graph.py b/aria/workflows/api/task_graph.py
new file mode 100644
index 0000000..c88d343
--- /dev/null
+++ b/aria/workflows/api/task_graph.py
@@ -0,0 +1,290 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Task graph. Used by users to build workflows
+"""
+
+from uuid import uuid4
+from collections import Iterable
+
+from networkx import DiGraph, topological_sort
+
+from . import task as api_task
+
+
+class TaskNotInGraphError(Exception):
+    """
+    An error representing a scenario where a given task is not in the graph as expected
+    """
+    pass
+
+
+def _filter_out_empty_tasks(func=None):
+    if func is None:
+        return lambda f: _filter_out_empty_tasks(func=f)
+
+    def _wrapper(task, *tasks, **kwargs):
+        return func(*(t for t in [task] + list(tasks) if t), **kwargs)
+    return _wrapper
+
+
+class TaskGraph(object):
+    """
+    A tasks graph builder.
+    Build an operations flow graph
+    """
+
+    def __init__(self, name):
+        self.name = name
+        self._id = str(uuid4())
+        self._graph = DiGraph()
+
+    def __repr__(self):
+        return '{name}(id={self._id}, name={self.name}, graph={self._graph!r})'.format(
+            name=self.__class__.__name__, self=self)
+
+    @property
+    def id(self):
+        """
+        Represents the id of the graph
+        :return: graph id
+        """
+        return self._id
+
+    # graph traversal methods
+
+    @property
+    def tasks(self):
+        """
+        An iterator on tasks added to the graph
+        :yields: Iterator over all tasks in the graph
+        """
+        for _, data in self._graph.nodes_iter(data=True):
+            yield data['task']
+
+    def topological_order(self, reverse=False):
+        """
+        Returns topological sort on the graph
+        :param reverse: whether to reverse the sort
+        :return: a list which represents the topological sort
+        """
+        for task_id in topological_sort(self._graph, reverse=reverse):
+            yield self.get_task(task_id)
+
+    def get_dependencies(self, dependent_task):
+        """
+        Iterates over the task's dependencies
+        :param BaseTask dependent_task: The task whose dependencies are requested
+        :yields: Iterator over all tasks which dependency_task depends on
+        :raise: TaskNotInGraphError if dependent_task is not in the graph
+        """
+        if not self.has_tasks(dependent_task):
+            raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id))
+        for _, dependency_id in self._graph.out_edges_iter(dependent_task.id):
+            yield self.get_task(dependency_id)
+
+    def get_dependents(self, dependency_task):
+        """
+        Iterates over the task's dependents
+        :param BaseTask dependency_task: The task whose dependents are requested
+        :yields: Iterator over all tasks which depend on dependency_task
+        :raise: TaskNotInGraphError if dependency_task is not in the graph
+        """
+        if not self.has_tasks(dependency_task):
+            raise TaskNotInGraphError('Task id: {0}'.format(dependency_task.id))
+        for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id):
+            yield self.get_task(dependent_id)
+
+    # task methods
+
+    def get_task(self, task_id):
+        """
+        Get a task instance that's been inserted to the graph by the task's id
+        :param basestring task_id: The task's id
+        :return: Requested task
+        :rtype: BaseTask
+        :raise: TaskNotInGraphError if no task found in the graph with the given id
+        """
+        if not self._graph.has_node(task_id):
+            raise TaskNotInGraphError('Task id: {0}'.format(task_id))
+        data = self._graph.node[task_id]
+        return data['task']
+
+    @_filter_out_empty_tasks
+    def add_tasks(self, *tasks):
+        """
+        Add a task to the graph
+        :param BaseTask task: The task
+        :return: A list of added tasks
+        :rtype: list
+        """
+        assert all([isinstance(task, (api_task.BaseTask, Iterable)) for task in tasks])
+        return_tasks = []
+
+        for task in tasks:
+            if isinstance(task, Iterable):
+                return_tasks += self.add_tasks(*task)
+            elif not self.has_tasks(task):
+                self._graph.add_node(task.id, task=task)
+                return_tasks.append(task)
+
+        return return_tasks
+
+    @_filter_out_empty_tasks
+    def remove_tasks(self, *tasks):
+        """
+        Remove the provided task from the graph
+        :param BaseTask task: The task
+        :return: A list of removed tasks
+        :rtype: list
+        """
+        return_tasks = []
+
+        for task in tasks:
+            if isinstance(task, Iterable):
+                return_tasks += self.remove_tasks(*task)
+            elif self.has_tasks(task):
+                self._graph.remove_node(task.id)
+                return_tasks.append(task)
+
+        return return_tasks
+
+    @_filter_out_empty_tasks
+    def has_tasks(self, *tasks):
+        """
+        Check whether a task is in the graph or not
+        :param BaseTask task: The task
+        :return: True if all tasks are in the graph, otherwise True
+        :rtype: list
+        """
+        assert all(isinstance(t, (api_task.BaseTask, Iterable)) for t in tasks)
+        return_value = True
+
+        for task in tasks:
+            if isinstance(task, Iterable):
+                return_value &= self.has_tasks(*task)
+            else:
+                return_value &= self._graph.has_node(task.id)
+
+        return return_value
+
+    def add_dependency(self, dependent, dependency):
+        """
+        Add a dependency for one item (task, sequence or parallel) on another
+        The dependent will only be executed after the dependency terminates
+        If either of the items is either a sequence or a parallel,
+         multiple dependencies may be added
+        :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel)
+        :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel)
+        :return: True if the dependency between the two hadn't already existed, otherwise False
+        :rtype: bool
+        :raise TaskNotInGraphError if either the dependent or dependency are tasks which
+         are not in the graph
+        """
+        if not (self.has_tasks(dependent) and self.has_tasks(dependency)):
+            raise TaskNotInGraphError()
+
+        if self.has_dependency(dependent, dependency):
+            return
+
+        if isinstance(dependent, Iterable):
+            for dependent_task in dependent:
+                self.add_dependency(dependent_task, dependency)
+        else:
+            if isinstance(dependency, Iterable):
+                for dependency_task in dependency:
+                    self.add_dependency(dependent, dependency_task)
+            else:
+                self._graph.add_edge(dependent.id, dependency.id)
+
+    def has_dependency(self, dependent, dependency):
+        """
+        Check whether one item (task, sequence or parallel) depends on another
+
+        Note that if either of the items is either a sequence or a parallel,
+        and some of the dependencies exist in the graph but not all of them,
+        this method will return False
+
+        :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel)
+        :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel)
+        :return: True if the dependency between the two exists, otherwise False
+        :rtype: bool
+        :raise TaskNotInGraphError if either the dependent or dependency are tasks
+         which are not in the graph
+        """
+        if not (dependent and dependency):
+            return False
+        elif not (self.has_tasks(dependent) and self.has_tasks(dependency)):
+            raise TaskNotInGraphError()
+
+        return_value = True
+
+        if isinstance(dependent, Iterable):
+            for dependent_task in dependent:
+                return_value &= self.has_dependency(dependent_task, dependency)
+        else:
+            if isinstance(dependency, Iterable):
+                for dependency_task in dependency:
+                    return_value &= self.has_dependency(dependent, dependency_task)
+            else:
+                return_value &= self._graph.has_edge(dependent.id, dependency.id)
+
+        return return_value
+
+    def remove_dependency(self, dependent, dependency):
+        """
+        Remove a dependency for one item (task, sequence or parallel) on another
+
+        Note that if either of the items is either a sequence or a parallel, and some of
+        the dependencies exist in the graph but not all of them, this method will not remove
+        any of the dependencies and return False
+
+        :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel)
+        :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel)
+        :return: False if the dependency between the two hadn't existed, otherwise True
+        :rtype: bool
+        :raise TaskNotInGraphError if either the dependent or dependency are tasks
+         which are not in the graph
+        """
+        if not (self.has_tasks(dependent) and self.has_tasks(dependency)):
+            raise TaskNotInGraphError()
+
+        if not self.has_dependency(dependent, dependency):
+            return
+
+        if isinstance(dependent, Iterable):
+            for dependent_task in dependent:
+                self.remove_dependency(dependent_task, dependency)
+        elif isinstance(dependency, Iterable):
+            for dependency_task in dependency:
+                self.remove_dependency(dependent, dependency_task)
+        else:
+            self._graph.remove_edge(dependent.id, dependency.id)
+
+    @_filter_out_empty_tasks
+    def sequence(self, *tasks):
+        """
+        Create and insert a sequence into the graph, effectively each task i depends on i-1
+        :param tasks: an iterable of dependencies
+        :return: the provided tasks
+        """
+        if tasks:
+            self.add_tasks(*tasks)
+
+            for i in xrange(1, len(tasks)):
+                self.add_dependency(tasks[i], tasks[i-1])
+
+        return tasks

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/api/tasks_graph.py
----------------------------------------------------------------------
diff --git a/aria/workflows/api/tasks_graph.py b/aria/workflows/api/tasks_graph.py
deleted file mode 100644
index 5160345..0000000
--- a/aria/workflows/api/tasks_graph.py
+++ /dev/null
@@ -1,203 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from uuid import uuid4
-
-from networkx import DiGraph, topological_sort
-
-from aria.tools.validation import ValidatorMixin
-
-
-class TaskNotFoundError(Exception):
-    pass
-
-
-class TaskNotInGraphError(Exception):
-    pass
-
-
-class TaskGraph(ValidatorMixin):
-    """
-    A task graph builder.
-    Build a operations flow graph
-    """
-
-    def __init__(self, name):
-        self.name = name
-        self.id = str(uuid4())
-        self.graph = DiGraph()
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.graph, attr)
-        except AttributeError:
-            return super(TaskGraph, self).__getattribute__(attr)
-
-    def __repr__(self):
-        return '{name}(id={self.id}, name={self.name}, graph={self.graph!r})'.format(
-            name=self.__class__.__name__, self=self)
-
-    @property
-    def tasks(self):
-        """
-        An iterator on tasks added to the graph
-        """
-        for _, data in self.graph.nodes_iter(data=True):
-            yield data['task']
-
-    @property
-    def leaf_tasks(self):
-        for task in self.tasks_in_order():
-            if not self.graph.predecessors(task.id):
-                yield task
-
-    def task_tree(self, reverse=False):
-        """
-        Iterates over the tasks to be executed in topological order and their dependencies.
-        :param reverse: reverse the order
-        """
-        for task in self.tasks_in_order(reverse=reverse):
-            yield task, self.task_dependencies(task)
-
-    def tasks_in_order(self, reverse=False):
-        """
-        Iterates over the tasks to be executed in topological order
-        :param reverse: reverse the order
-        """
-        for task_id in topological_sort(self.graph, reverse=reverse):
-            yield self.graph.node[task_id]['task']
-
-    def has_dependencies(self, task):
-        return len(self.task_dependencies(task)) > 0
-
-    def task_dependencies(self, task):
-        """
-        Iterates over the task dependencies
-        """
-        for task_ids in self.graph.edges_iter(task.id):
-            for task_id in task_ids:
-                if task.id != task_id:
-                    yield self.get_task(task_id)
-
-    def add_task(self, task):
-        """
-        Add a task to this graph
-        :param WorkflowTask|TaskGraph task: The task
-        """
-        self.graph.add_node(task.id, task=task)
-
-    def get_task(self, task_id):
-        """
-        Get a task instance that was inserted to this graph by its id
-
-        :param basestring task_id: the task id
-        :return: requested task
-        :rtype: WorkflowTask|TaskGraph
-        :raise: TaskNotFoundError if no task found with given id
-        """
-        try:
-            data = self.graph.node[task_id]
-            return data['task']
-        except KeyError:
-            raise TaskNotFoundError('Task id: {0}'.format(task_id))
-
-    def remove_task(self, task):
-        """
-        Remove the provided task from the graph
-        :param WorkflowTask|graph task: The task
-        """
-        self.graph.remove_node(task.id)
-
-    def dependency(self, source_task, after):
-        """
-        Add a dependency between tasks.
-        The source task will only be executed after the target task terminates.
-        A source task may depend on several tasks,
-        In which case it will only be executed after all its target tasks will terminate.
-
-        tasks flow order:
-        after -> source_task
-
-        :param WorkflowTask|TaskGraph source_task: The source task
-        :type source_task: WorkflowTask
-        :param list after: The target task
-        :raise TaskNotInGraphError
-        """
-        if not self.graph.has_node(source_task.id):
-            raise TaskNotInGraphError(
-                'source task {0!r} is not in graph (task id: {0.id})'.format(source_task))
-        for target_task in after:
-            if not self.graph.has_node(target_task.id):
-                raise TaskNotInGraphError(
-                    'target task {0!r} is not in graph (task id: {0.id})'.format(target_task))
-            self.graph.add_edge(source_task.id, target_task.id)
-
-    # workflow creation helper methods
-    def chain(self, tasks, after=()):
-        """
-        create a chain of tasks.
-        tasks will be added to the graph with a dependency between
-        the tasks by order.
-
-        tasks flow order:
-        if tasks = (task0, task1, ..., taskn)
-        after -> task0 -> task1 -> ... -> taskn
-
-        :param tasks: list of WorkflowTask instances.
-        :param after: target to the sequence
-        """
-        for source_task in tasks:
-            self.add_task(source_task)
-            self.dependency(source_task, after=after)
-            after = (source_task,)
-
-    def fan_out(self, tasks, after=()):
-        """
-        create a fan-out.
-        tasks will be added to the graph with a dependency to
-        the target task.
-
-        tasks flow order:
-        if tasks = (task0, task1, ..., taskn)
-        after      -> task0
-                   |-> task1
-                   |...
-                   \-> taskn
-
-        :param tasks: list of WorkflowTask instances.
-        :param after: target to the tasks
-        """
-        for source_task in tasks:
-            self.add_task(source_task)
-            self.dependency(source_task, after=after)
-
-    def fan_in(self, source_task, after=None):
-        """
-        create a fan-in.
-        source task will be added to the graph with a dependency to
-        the tasks.
-
-        tasks flow order:
-        if after = (task0, task1, ..., taskn)
-        task0\
-        task1|-> source_task
-        ...  |
-        taskn/
-
-        :param source_task: source to the tasks
-        :param after: list of WorkflowTask instances.
-        """
-        self.add_task(source_task)
-        self.dependency(source_task, after=after)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/execute_operation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/execute_operation.py b/aria/workflows/builtin/execute_operation.py
index 9e87c30..ddbb8e7 100644
--- a/aria/workflows/builtin/execute_operation.py
+++ b/aria/workflows/builtin/execute_operation.py
@@ -24,7 +24,7 @@ from .workflows import execute_operation_on_instance
 
 @workflow
 def execute_operation(
-        context,
+        ctx,
         graph,
         operation,
         operation_kwargs,
@@ -37,7 +37,7 @@ def execute_operation(
     """
     The execute_operation workflow
 
-    :param WorkflowContext context: the workflow context
+    :param WorkflowContext workflow_context: the workflow context
     :param TaskGraph graph: the graph which will describe the workflow.
     :param basestring operation: the operation name to execute
     :param dict operation_kwargs:
@@ -52,7 +52,7 @@ def execute_operation(
     subgraphs = {}
     # filtering node instances
     filtered_node_instances = list(_filter_node_instances(
-        context=context,
+        context=ctx,
         node_ids=node_ids,
         node_instance_ids=node_instance_ids,
         type_names=type_names))
@@ -60,31 +60,31 @@ def execute_operation(
     if run_by_dependency_order:
         filtered_node_instances_ids = set(node_instance.id
                                           for node_instance in filtered_node_instances)
-        for node_instance in context.node_instances:
+        for node_instance in ctx.node_instances:
             if node_instance.id not in filtered_node_instances_ids:
-                subgraphs[node_instance.id] = context.task_graph(
+                subgraphs[node_instance.id] = ctx.task_graph(
                     name='execute_operation_stub_{0}'.format(node_instance.id))
 
     # registering actual tasks to sequences
     for node_instance in filtered_node_instances:
-        node_instance_sub_workflow = execute_operation_on_instance(
-            context=context,
-            graph=graph,
-            node_instance=node_instance,
-            operation=operation,
-            operation_kwargs=operation_kwargs,
-            allow_kwargs_override=allow_kwargs_override)
-        subgraphs[node_instance.id] = node_instance_sub_workflow
+        graph.add_tasks(
+            execute_operation_on_instance(
+                node_instance=node_instance,
+                operation=operation,
+                operation_kwargs=operation_kwargs,
+                allow_kwargs_override=allow_kwargs_override
+            )
+        )
 
     for _, node_instance_sub_workflow in subgraphs.items():
-        graph.add_task(node_instance_sub_workflow)
+        graph.add_tasks(node_instance_sub_workflow)
 
     # adding tasks dependencies if required
     if run_by_dependency_order:
-        for node_instance in context.node_instances:
+        for node_instance in ctx.node_instances:
             for relationship_instance in node_instance.relationship_instances:
-                graph.dependency(source_task=subgraphs[node_instance.id],
-                                 after=[subgraphs[relationship_instance.target_id]])
+                graph.add_dependency(source_task=subgraphs[node_instance.id],
+                                     after=[subgraphs[relationship_instance.target_id]])
 
 
 def _filter_node_instances(context, node_ids=(), node_instance_ids=(), type_names=()):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/heal.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/heal.py b/aria/workflows/builtin/heal.py
index cab2e6e..dc320dc 100644
--- a/aria/workflows/builtin/heal.py
+++ b/aria/workflows/builtin/heal.py
@@ -19,50 +19,47 @@ Builtin heal workflow
 
 from aria import workflow
 
-from .uninstall import uninstall
-from .install import install
-from .workflows import relationship_tasks
+from .workflows import relationship_tasks, install_node_instance, uninstall_node_instance
+from ..api import task
 
 
 @workflow
-def heal(context, graph, node_instance_id):
+def heal(ctx, graph, node_instance_id):
     """
     The heal workflow
 
-    :param WorkflowContext context: the workflow context
+    :param WorkflowContext ctx: the workflow context
     :param TaskGraph graph: the graph which will describe the workflow.
     :param node_instance_id: the id of the node instance to heal
     :return:
     """
-    failing_node = context.storage.node_instance.get(node_instance_id)
-    host_node = context.storage.node_instance.get(failing_node.host_id)
-    failed_node_instance_subgraph = _get_contained_subgraph(context, host_node)
+    failing_node = ctx.model.node_instance.get(node_instance_id)
+    host_node = ctx.model.node_instance.get(failing_node.host_id)
+    failed_node_instance_subgraph = _get_contained_subgraph(ctx, host_node)
     failed_node_instance_ids = list(n.id for n in failed_node_instance_subgraph)
 
-    targeted_node_instances = [
-        context.storage.node_instance.get(relationship_instance.target_id)
-        for node_instance in failed_node_instance_subgraph
-        for relationship_instance in node_instance.relationship_instances
-        if relationship_instance.target_id not in failed_node_instance_ids
-    ]
-
-    graph.chain([
-        heal_uninstall(
-            context=context,
-            failing_node_instances=failed_node_instance_subgraph,
-            targeted_node_instances=targeted_node_instances),
-        heal_install(
-            context=context,
-            failing_node_instances=failed_node_instance_subgraph,
-            targeted_node_instances=targeted_node_instances)
-    ])
+    targeted_node_instances = [node_instance for node_instance in ctx.node_instances
+                               if node_instance.id not in failed_node_instance_ids]
+
+    uninstall_subgraph = task.WorkflowTask(
+        heal_uninstall,
+        failing_node_instances=failed_node_instance_subgraph,
+        targeted_node_instances=targeted_node_instances
+    )
+
+    install_subgraph = task.WorkflowTask(
+        heal_install,
+        failing_node_instances=failed_node_instance_subgraph,
+        targeted_node_instances=targeted_node_instances)
+
+    graph.sequence(uninstall_subgraph, install_subgraph)
 
 
 @workflow(suffix_template='{failing_node_instances}')
-def heal_uninstall(context, graph, failing_node_instances, targeted_node_instances):
+def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances):
     """
     the uninstall part of the heal mechanism
-    :param WorkflowContext context: the workflow context
+    :param WorkflowContext ctx: the workflow context
     :param TaskGraph graph: the task graph to edit.
     :param failing_node_instances: the failing nodes to heal.
     :param targeted_node_instances: the targets of the relationships where the failing node are
@@ -73,46 +70,49 @@ def heal_uninstall(context, graph, failing_node_instances, targeted_node_instanc
 
     # Create install stub workflow for each unaffected node instance
     for node_instance in targeted_node_instances:
-        node_instance_sub_workflow = context.task_graph(
-            name='uninstall_stub_{0}'.format(node_instance.id))
+        node_instance_stub = task.StubTask()
+        node_instance_sub_workflows[node_instance.id] = node_instance_stub
+        graph.add_tasks(node_instance_stub)
+
+    # create install sub workflow for every node instance
+    for node_instance in failing_node_instances:
+        node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance,
+                                                       node_instance=node_instance)
         node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow
-        graph.add_task(node_instance_sub_workflow)
+        graph.add_tasks(node_instance_sub_workflow)
 
-    # Create install sub workflow for each failing node
-    uninstall(
-        context=context,
-        graph=graph,
-        node_instances=failing_node_instances,
-        node_instance_sub_workflows=node_instance_sub_workflows)
+    # create dependencies between the node instance sub workflow
+    for node_instance in failing_node_instances:
+        node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
+        for relationship_instance in reversed(node_instance.relationship_instances):
+            graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id],
+                                 node_instance_sub_workflow)
 
-    # Add operations for intact nodes depending on a node instance
-    # belonging to node_instances
+    # Add operations for intact nodes depending on a node instance belonging to node_instances
     for node_instance in targeted_node_instances:
         node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
 
         for relationship_instance in reversed(node_instance.relationship_instances):
-            target_node_instance = context.storage.node_instance.get(
-                relationship_instance.target_id)
-            if target_node_instance in failing_node_instances:
-                after_tasks = [node_instance_sub_workflows[relationship.target_id]
-                               for relationship in node_instance.relationship_instances]
+            target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id)
+            target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id]
+            graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow)
 
-            elif target_node_instance in targeted_node_instances:
-                after_tasks = [relationship_tasks(
+            if target_node_instance in failing_node_instances:
+                dependency = relationship_tasks(
+                    graph=graph,
                     node_instance=node_instance,
                     relationship_instance=relationship_instance,
-                    context=context,
-                    operation_name='aria.interfaces.relationship_lifecycle.unlink')]
+                    context=ctx,
+                    operation_name='aria.interfaces.relationship_lifecycle.unlink')
 
-            if after_tasks:
-                graph.dependency(source_task=node_instance_sub_workflow, after=after_tasks)
+                graph.add_dependency(node_instance_sub_workflow, dependency)
 
 
 @workflow(suffix_template='{failing_node_instances}')
-def heal_install(context, graph, failing_node_instances, targeted_node_instances):
+def heal_install(ctx, graph, failing_node_instances, targeted_node_instances):
     """
     the install part of the heal mechanism
-    :param WorkflowContext context: the workflow context
+    :param WorkflowContext ctx: the workflow context
     :param TaskGraph graph: the task graph to edit.
     :param failing_node_instances: the failing nodes to heal.
     :param targeted_node_instances: the targets of the relationships where the failing node are
@@ -123,17 +123,24 @@ def heal_install(context, graph, failing_node_instances, targeted_node_instances
 
     # Create install sub workflow for each unaffected
     for node_instance in targeted_node_instances:
-        node_instance_sub_workflow = context.task_graph(
-            name='install_stub_{0}'.format(node_instance.id))
-        node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow
-        graph.add_task(node_instance_sub_workflow)
+        node_instance_stub = task.StubTask()
+        node_instance_sub_workflows[node_instance.id] = node_instance_stub
+        graph.add_tasks(node_instance_stub)
 
     # create install sub workflow for every node instance
-    install(
-        context=context,
-        graph=graph,
-        node_instances=failing_node_instances,
-        node_instance_sub_workflows=node_instance_sub_workflows)
+    for node_instance in failing_node_instances:
+        node_instance_sub_workflow = task.WorkflowTask(install_node_instance,
+                                                       node_instance=node_instance)
+        node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow
+        graph.add_tasks(node_instance_sub_workflow)
+
+    # create dependencies between the node instance sub workflow
+    for node_instance in failing_node_instances:
+        node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
+        if node_instance.relationship_instances:
+            dependencies = [node_instance_sub_workflows[relationship_instance.target_id]
+                            for relationship_instance in node_instance.relationship_instances]
+            graph.add_dependency(node_instance_sub_workflow, dependencies)
 
     # Add operations for intact nodes depending on a node instance
     # belonging to node_instances
@@ -141,35 +148,33 @@ def heal_install(context, graph, failing_node_instances, targeted_node_instances
         node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
 
         for relationship_instance in node_instance.relationship_instances:
-            target_node_instance = context.storage.node_instance.get(
-                relationship_instance.target_id)
-            if target_node_instance in failing_node_instances:
-                after_tasks = [node_instance_sub_workflows[relationship.target_id]
-                               for relationship in node_instance.relationship_instances]
+            target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id)
+            target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id]
+            graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow)
 
-            elif target_node_instance in targeted_node_instances:
-                after_tasks = [relationship_tasks(
+            if target_node_instance in failing_node_instances:
+                dependent = relationship_tasks(
+                    graph=graph,
                     node_instance=node_instance,
                     relationship_instance=relationship_instance,
-                    context=context,
-                    operation_name='aria.interfaces.relationship_lifecycle.establish')]
+                    context=ctx,
+                    operation_name='aria.interfaces.relationship_lifecycle.establish')
 
-            if after_tasks:
-                graph.dependency(source_task=node_instance_sub_workflow, after=after_tasks)
+                graph.add_dependency(dependent, node_instance_sub_workflow)
 
 
 def _get_contained_subgraph(context, host_node_instance):
-    contained_instances = set(node_instance
-                              for node_instance in context.node_instances
-                              if node_instance.host_id == host_node_instance.id and
-                              node_instance.id != node_instance.host_id)
-    result = {host_node_instance}
+    contained_instances = [node_instance
+                           for node_instance in context.node_instances
+                           if node_instance.host_id == host_node_instance.id and
+                           node_instance.id != node_instance.host_id]
+    result = [host_node_instance]
 
     if not contained_instances:
         return result
 
-    result.update(contained_instances)
+    result.extend(contained_instances)
     for node_instance in contained_instances:
-        result.update(_get_contained_subgraph(context, node_instance))
+        result.extend(_get_contained_subgraph(context, node_instance))
 
-    return result
+    return set(result)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/install.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/install.py b/aria/workflows/builtin/install.py
index 35d2968..0ab3ad6 100644
--- a/aria/workflows/builtin/install.py
+++ b/aria/workflows/builtin/install.py
@@ -20,13 +20,14 @@ Builtin install workflow
 from aria import workflow
 
 from .workflows import install_node_instance
+from ..api import task
 
 
 @workflow
-def install(context, graph, node_instances=(), node_instance_sub_workflows=None):
+def install(ctx, graph, node_instances=(), node_instance_sub_workflows=None):
     """
     The install workflow
-    :param WorkflowContext context: the workflow context
+    :param WorkflowContext ctx: the workflow context
     :param TaskGraph graph: the graph which will describe the workflow.
     :param node_instances: the node instances on which to run the workflow
     :param dict node_instance_sub_workflows: a dictionary of subworkflows  with id as key and
@@ -34,21 +35,19 @@ def install(context, graph, node_instances=(), node_instance_sub_workflows=None)
     :return:
     """
     node_instance_sub_workflows = node_instance_sub_workflows or {}
-    node_instances = node_instances or list(context.node_instances)
+    node_instances = node_instances or list(ctx.node_instances)
 
     # create install sub workflow for every node instance
     for node_instance in node_instances:
-        node_instance_sub_workflow = install_node_instance(
-            context=context,
-            node_instance=node_instance)
+        node_instance_sub_workflow = task.WorkflowTask(install_node_instance,
+                                                       node_instance=node_instance)
         node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow
-        graph.add_task(node_instance_sub_workflow)
+        graph.add_tasks(node_instance_sub_workflow)
 
     # create dependencies between the node instance sub workflow
     for node_instance in node_instances:
         node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
-        graph.dependency(
-            source_task=node_instance_sub_workflow,
-            after=[
-                node_instance_sub_workflows[relationship.target_id]
-                for relationship in node_instance.relationship_instances])
+        if node_instance.relationship_instances:
+            dependencies = [node_instance_sub_workflows[relationship_instance.target_id]
+                            for relationship_instance in node_instance.relationship_instances]
+            graph.add_dependency(node_instance_sub_workflow, dependencies)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8947f72c/aria/workflows/builtin/uninstall.py
----------------------------------------------------------------------
diff --git a/aria/workflows/builtin/uninstall.py b/aria/workflows/builtin/uninstall.py
index 47c8259..f4e965c 100644
--- a/aria/workflows/builtin/uninstall.py
+++ b/aria/workflows/builtin/uninstall.py
@@ -20,10 +20,11 @@ Builtin uninstall workflow
 from aria import workflow
 
 from .workflows import uninstall_node_instance
+from ..api import task
 
 
 @workflow
-def uninstall(context, graph, node_instances=(), node_instance_sub_workflows=None):
+def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None):
     """
     The uninstall workflow
     :param WorkflowContext context: the workflow context
@@ -34,20 +35,18 @@ def uninstall(context, graph, node_instances=(), node_instance_sub_workflows=Non
     :return:
     """
     node_instance_sub_workflows = node_instance_sub_workflows or {}
-    node_instances = node_instances or list(context.node_instances)
+    node_instances = node_instances or list(ctx.node_instances)
 
     # create install sub workflow for every node instance
     for node_instance in node_instances:
-        node_instance_sub_workflow = uninstall_node_instance(
-            context=context,
-            node_instance=node_instance)
+        node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance,
+                                                       node_instance=node_instance)
         node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow
-        graph.add_task(node_instance_sub_workflow)
+        graph.add_tasks(node_instance_sub_workflow)
 
     # create dependencies between the node instance sub workflow
     for node_instance in node_instances:
         node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id]
         for relationship_instance in reversed(node_instance.relationship_instances):
-            graph.dependency(
-                source_task=node_instance_sub_workflows[relationship_instance.target_id],
-                after=[node_instance_sub_workflow])
+            graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id],
+                                 node_instance_sub_workflow)


Mime
View raw message