ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From emblempar...@apache.org
Subject [1/4] incubator-ariatosca git commit: ARIA-163 Update node state for empty tasks [Forced Update!]
Date Fri, 05 May 2017 18:00:40 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-139-attributes b3ee51f78 -> 936feaf1f (forced update)


ARIA-163 Update node state for empty tasks

Additional changes:
 * removed `for_node` and `for_relationship` from the api OperationTask.
 * api based OperationTask could also have an empty implementation.
 * each core task wields its own executor.
 * Reordered some of the helper functions for creating tasks.
 * intoduced 2 new executors: StubTaskExecutor (for stub tasks) and EmptyOperationExecutor (for empty tasks)


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8ca3ff29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8ca3ff29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8ca3ff29

Branch: refs/heads/ARIA-139-attributes
Commit: 8ca3ff297b71e270eccdd9a2e6b8bf468ccdab5d
Parents: 0878526
Author: max-orlov <maxim@gigaspaces.com>
Authored: Sun Apr 30 16:05:27 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Thu May 4 17:35:56 2017 +0300

----------------------------------------------------------------------
 aria/logger.py                                  |   2 +
 aria/orchestrator/workflows/api/task.py         | 185 ++++++++++---------
 .../workflows/builtin/execute_operation.py      |  28 +--
 aria/orchestrator/workflows/builtin/heal.py     |   4 +-
 aria/orchestrator/workflows/builtin/install.py  |   9 +-
 aria/orchestrator/workflows/builtin/start.py    |   4 +-
 aria/orchestrator/workflows/builtin/stop.py     |   4 +-
 .../orchestrator/workflows/builtin/uninstall.py |  11 +-
 aria/orchestrator/workflows/builtin/utils.py    | 138 --------------
 .../orchestrator/workflows/builtin/workflows.py |  71 ++++---
 aria/orchestrator/workflows/core/engine.py      |  12 +-
 .../workflows/core/events_handler.py            |   1 -
 aria/orchestrator/workflows/core/task.py        |  35 ++--
 aria/orchestrator/workflows/core/translation.py |  33 ++--
 aria/orchestrator/workflows/events_logging.py   |  13 +-
 aria/orchestrator/workflows/executor/base.py    |  11 ++
 aria/orchestrator/workflows/executor/dry.py     |   1 -
 .../profiles/tosca-simple-1.0/interfaces.yaml   |   5 +
 tests/end2end/test_hello_world.py               |   5 +-
 tests/end2end/testenv.py                        |   2 +-
 tests/orchestrator/context/test_operation.py    |  32 ++--
 tests/orchestrator/context/test_serialize.py    |   2 +-
 tests/orchestrator/context/test_toolbelt.py     |   8 +-
 .../orchestrator/execution_plugin/test_local.py |   4 +-
 tests/orchestrator/execution_plugin/test_ssh.py |   4 +-
 tests/orchestrator/workflows/api/test_task.py   |  16 +-
 .../orchestrator/workflows/core/test_engine.py  |  60 +++---
 .../orchestrator/workflows/core/test_events.py  |   4 +-
 tests/orchestrator/workflows/core/test_task.py  |  15 +-
 .../test_task_graph_into_execution_graph.py     |  25 ++-
 ...process_executor_concurrent_modifications.py |  18 +-
 .../executor/test_process_executor_extension.py |   9 +-
 .../test_process_executor_tracked_changes.py    |   9 +-
 .../tosca-simple-1.0/node-cellar/workflows.py   |  10 +-
 34 files changed, 361 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 8e15f5b..97d3878 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -195,6 +195,8 @@ class _SQLAlchemyHandler(logging.Handler):
         except BaseException:
             self._session.rollback()
             raise
+        finally:
+            self._session.close()
 
 
 _default_file_formatter = logging.Formatter(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 82c40c3..cb79eb3 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -21,6 +21,7 @@ from ... import context
 from ....modeling import models
 from ....modeling import utils as modeling_utils
 from ....utils.uuid import generate_uuid
+from .. import exceptions
 
 
 class BaseTask(object):
@@ -71,102 +72,44 @@ class OperationTask(BaseTask):
         Do not call this constructor directly. Instead, use :meth:`for_node` or
         :meth:`for_relationship`.
         """
-
-        actor_type = type(actor).__name__.lower()
         assert isinstance(actor, (models.Node, models.Relationship))
-        assert actor_type in ('node', 'relationship')
-        assert interface_name and operation_name
         super(OperationTask, self).__init__()
-
         self.actor = actor
-        self.max_attempts = (self.workflow_context._task_max_attempts
-                             if max_attempts is None else max_attempts)
-        self.retry_interval = (self.workflow_context._task_retry_interval
-                               if retry_interval is None else retry_interval)
-        self.ignore_failure = (self.workflow_context._task_ignore_failure
-                               if ignore_failure is None else ignore_failure)
         self.interface_name = interface_name
         self.operation_name = operation_name
+        self.max_attempts = max_attempts or self.workflow_context._task_max_attempts
+        self.retry_interval = retry_interval or self.workflow_context._task_retry_interval
+        self.ignore_failure = \
+            self.workflow_context._task_ignore_failure if ignore_failure is None else ignore_failure
+        self.name = OperationTask.NAME_FORMAT.format(type=type(actor).__name__.lower(),
+                                                     name=actor.name,
+                                                     interface=self.interface_name,
+                                                     operation=self.operation_name)
+        # Creating OperationTask directly should raise an error when there is no
+        # interface/operation.
+
+        if not has_operation(self.actor, self.interface_name, self.operation_name):
+            raise exceptions.OperationNotFoundException(
+                'Could not find operation "{self.operation_name}" on interface '
+                '"{self.interface_name}" for {actor_type} "{actor.name}"'.format(
+                    self=self,
+                    actor_type=type(actor).__name__.lower(),
+                    actor=actor)
+            )
 
         operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
         self.plugin = operation.plugin
         self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
         self.implementation = operation.implementation
-        self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
-                                                     name=actor.name,
-                                                     interface=self.interface_name,
-                                                     operation=self.operation_name)
 
     def __repr__(self):
         return self.name
 
-    @classmethod
-    def for_node(cls,
-                 node,
-                 interface_name,
-                 operation_name,
-                 max_attempts=None,
-                 retry_interval=None,
-                 ignore_failure=None,
-                 inputs=None):
-        """
-        Creates an operation on a node.
-
-        :param node: The node on which to run the operation
-        :param interface_name: The interface name
-        :param operation_name: The operation name within the interface
-        :param max_attempts: The maximum number of attempts in case the operation fails
-                             (if not specified the defaults it taken from the workflow context)
-        :param retry_interval: The interval in seconds between attempts when the operation fails
-                               (if not specified the defaults it taken from the workflow context)
-        :param ignore_failure: Whether to ignore failures
-                               (if not specified the defaults it taken from the workflow context)
-        :param inputs: Additional operation inputs
-        """
-
-        assert isinstance(node, models.Node)
-        return cls(
-            actor=node,
-            interface_name=interface_name,
-            operation_name=operation_name,
-            max_attempts=max_attempts,
-            retry_interval=retry_interval,
-            ignore_failure=ignore_failure,
-            inputs=inputs)
-
-    @classmethod
-    def for_relationship(cls,
-                         relationship,
-                         interface_name,
-                         operation_name,
-                         max_attempts=None,
-                         retry_interval=None,
-                         ignore_failure=None,
-                         inputs=None):
-        """
-        Creates an operation on a relationship edge.
-
-        :param relationship: The relationship on which to run the operation
-        :param interface_name: The interface name
-        :param operation_name: The operation name within the interface
-        :param max_attempts: The maximum number of attempts in case the operation fails
-                             (if not specified the defaults it taken from the workflow context)
-        :param retry_interval: The interval in seconds between attempts when the operation fails
-                               (if not specified the defaults it taken from the workflow context)
-        :param ignore_failure: Whether to ignore failures
-                               (if not specified the defaults it taken from the workflow context)
-        :param inputs: Additional operation inputs
-        """
 
-        assert isinstance(relationship, models.Relationship)
-        return cls(
-            actor=relationship,
-            interface_name=interface_name,
-            operation_name=operation_name,
-            max_attempts=max_attempts,
-            retry_interval=retry_interval,
-            ignore_failure=ignore_failure,
-            inputs=inputs)
+class StubTask(BaseTask):
+    """
+    Enables creating empty tasks.
+    """
 
 
 class WorkflowTask(BaseTask):
@@ -199,7 +142,83 @@ class WorkflowTask(BaseTask):
             return super(WorkflowTask, self).__getattribute__(item)
 
 
-class StubTask(BaseTask):
+def create_task(actor, interface_name, operation_name, **kwargs):
     """
-    Enables creating empty tasks.
+    This helper function enables safe creation of OperationTask, if the supplied interface or
+    operation do not exist, None is returned.
+    :param actor: the actor for this task
+    :param interface_name: the name of the interface
+    :param operation_name: the name of the operation
+    :param kwargs: any additional kwargs to be passed to the task OperationTask
+    :return: and OperationTask or None (if the interface/operation does not exists)
+    """
+    try:
+        return OperationTask(
+            actor,
+            interface_name=interface_name,
+            operation_name=operation_name,
+            **kwargs
+        )
+    except exceptions.OperationNotFoundException:
+        return None
+
+
+def create_relationships_tasks(
+        node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
+    """
+    Creates a relationship task (source and target) for all of a node_instance relationships.
+    :param basestring source_operation_name: the relationship operation name.
+    :param basestring interface_name: the name of the interface.
+    :param source_operation_name:
+    :param target_operation_name:
+    :param NodeInstance node: the source_node
+    :return:
+    """
+    sub_tasks = []
+    for relationship in node.outbound_relationships:
+        relationship_operations = create_relationship_tasks(
+            relationship,
+            interface_name,
+            source_operation_name=source_operation_name,
+            target_operation_name=target_operation_name,
+            **kwargs)
+        sub_tasks.append(relationship_operations)
+    return sub_tasks
+
+
+def create_relationship_tasks(relationship, interface_name, source_operation_name=None,
+                              target_operation_name=None, **kwargs):
+    """
+    Creates a relationship task source and target.
+    :param Relationship relationship: the relationship instance itself
+    :param source_operation_name:
+    :param target_operation_name:
+
+    :return:
     """
+    operations = []
+    if source_operation_name:
+        operations.append(
+            create_task(
+                relationship,
+                interface_name=interface_name,
+                operation_name=source_operation_name,
+                **kwargs
+            )
+        )
+    if target_operation_name:
+        operations.append(
+            create_task(
+                relationship,
+                interface_name=interface_name,
+                operation_name=target_operation_name,
+                **kwargs
+            )
+        )
+
+    return [o for o in operations if o]
+
+
+def has_operation(actor, interface_name, operation_name):
+    interface = actor.interfaces.get(interface_name, None)
+    return interface and interface.operations.get(operation_name, False)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/execute_operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py
index 16504ec..02a654a 100644
--- a/aria/orchestrator/workflows/builtin/execute_operation.py
+++ b/aria/orchestrator/workflows/builtin/execute_operation.py
@@ -17,8 +17,8 @@
 Builtin execute_operation workflow
 """
 
-from . import utils
 from ... import workflow
+from ..api import task
 
 
 @workflow
@@ -65,11 +65,11 @@ def execute_operation(
     # registering actual tasks to sequences
     for node in filtered_nodes:
         graph.add_tasks(
-            _create_node_task(
-                node=node,
+            task.OperationTask(
+                node,
                 interface_name=interface_name,
                 operation_name=operation_name,
-                operation_kwargs=operation_kwargs
+                inputs=operation_kwargs
             )
         )
 
@@ -99,23 +99,3 @@ def _filter_nodes(context, node_template_ids=(), node_ids=(), type_names=()):
                 _is_node_by_id(node.id),
                 _is_node_by_type(node.node_template.type))):
             yield node
-
-
-def _create_node_task(
-        node,
-        interface_name,
-        operation_name,
-        operation_kwargs):
-    """
-    A workflow which executes a single operation
-    :param node: the node instance to install
-    :param basestring operation: the operation name
-    :param dict operation_kwargs:
-    :return:
-    """
-
-    return utils.create_node_task(
-        node=node,
-        interface_name=interface_name,
-        operation_name=operation_name,
-        inputs=operation_kwargs)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/heal.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py
index 92b96ea..ca382e8 100644
--- a/aria/orchestrator/workflows/builtin/heal.py
+++ b/aria/orchestrator/workflows/builtin/heal.py
@@ -103,7 +103,7 @@ def heal_uninstall(ctx, graph, failing_nodes, targeted_nodes):
             graph.add_dependency(target_node_subgraph, node_sub_workflow)
 
             if target_node in failing_nodes:
-                dependency = relationship_tasks(
+                dependency = task.create_relationship_tasks(
                     relationship=relationship,
                     operation_name='aria.interfaces.relationship_lifecycle.unlink')
                 graph.add_tasks(*dependency)
@@ -157,7 +157,7 @@ def heal_install(ctx, graph, failing_nodes, targeted_nodes):
             graph.add_dependency(node_sub_workflow, target_node_subworkflow)
 
             if target_node in failing_nodes:
-                dependent = relationship_tasks(
+                dependent = task.create_relationship_tasks(
                     relationship=relationship,
                     operation_name='aria.interfaces.relationship_lifecycle.establish')
                 graph.add_tasks(*dependent)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/install.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py
index 2b9ec66..821b190 100644
--- a/aria/orchestrator/workflows/builtin/install.py
+++ b/aria/orchestrator/workflows/builtin/install.py
@@ -17,16 +17,15 @@
 Builtin install workflow
 """
 
-from .workflows import install_node
-from .utils import create_node_task_dependencies
-from ..api.task import WorkflowTask
 from ... import workflow
+from ..api import task as api_task
+from . import workflows
 
 
 @workflow
 def install(ctx, graph):
     tasks_and_nodes = []
     for node in ctx.nodes:
-        tasks_and_nodes.append((WorkflowTask(install_node, node=node), node))
+        tasks_and_nodes.append((api_task.WorkflowTask(workflows.install_node, node=node), node))
     graph.add_tasks([task for task, _ in tasks_and_nodes])
-    create_node_task_dependencies(graph, tasks_and_nodes)
+    workflows.create_node_task_dependencies(graph, tasks_and_nodes)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/start.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/start.py b/aria/orchestrator/workflows/builtin/start.py
index ad67554..1946143 100644
--- a/aria/orchestrator/workflows/builtin/start.py
+++ b/aria/orchestrator/workflows/builtin/start.py
@@ -18,11 +18,11 @@ Builtin start workflow
 """
 
 from .workflows import start_node
-from ..api.task import WorkflowTask
 from ... import workflow
+from ..api import task as api_task
 
 
 @workflow
 def start(ctx, graph):
     for node in ctx.model.node.iter():
-        graph.add_tasks(WorkflowTask(start_node, node=node))
+        graph.add_tasks(api_task.WorkflowTask(start_node, node=node))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/stop.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/stop.py b/aria/orchestrator/workflows/builtin/stop.py
index 23ac366..c1b60ae 100644
--- a/aria/orchestrator/workflows/builtin/stop.py
+++ b/aria/orchestrator/workflows/builtin/stop.py
@@ -18,11 +18,11 @@ Builtin stop workflow
 """
 
 from .workflows import stop_node
-from ..api.task import WorkflowTask
+from ..api import task as api_task
 from ... import workflow
 
 
 @workflow
 def stop(ctx, graph):
     for node in ctx.model.node.iter():
-        graph.add_tasks(WorkflowTask(stop_node, node=node))
+        graph.add_tasks(api_task.WorkflowTask(stop_node, node=node))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/uninstall.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py
index e4afcd9..c35117e 100644
--- a/aria/orchestrator/workflows/builtin/uninstall.py
+++ b/aria/orchestrator/workflows/builtin/uninstall.py
@@ -17,18 +17,15 @@
 Builtin uninstall workflow
 """
 
-from .workflows import uninstall_node
-from .utils import create_node_task_dependencies
-from ..api.task import WorkflowTask
 from ... import workflow
+from ..api import task as api_task
+from . import workflows
 
 
 @workflow
 def uninstall(ctx, graph):
     tasks_and_nodes = []
     for node in ctx.nodes:
-        tasks_and_nodes.append((
-            WorkflowTask(uninstall_node, node=node),
-            node))
+        tasks_and_nodes.append((api_task.WorkflowTask(workflows.uninstall_node, node=node), node))
     graph.add_tasks([task for task, _ in tasks_and_nodes])
-    create_node_task_dependencies(graph, tasks_and_nodes, reverse=True)
+    workflows.create_node_task_dependencies(graph, tasks_and_nodes, reverse=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py
deleted file mode 100644
index 2254d13..0000000
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ /dev/null
@@ -1,138 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from ..api.task import OperationTask, StubTask
-from .. import exceptions
-
-
-def create_node_task(node, interface_name, operation_name, **kwargs):
-    """
-    Returns a new operation task if the operation exists in the node, otherwise returns None.
-    """
-
-    try:
-        if _is_empty_task(node, interface_name, operation_name):
-            return StubTask()
-
-        return OperationTask.for_node(node=node,
-                                      interface_name=interface_name,
-                                      operation_name=operation_name,
-                                      **kwargs)
-    except exceptions.OperationNotFoundException:
-        # We will skip nodes which do not have the operation
-        return None
-
-
-def create_relationships_tasks(
-        node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
-    """
-    Creates a relationship task (source and target) for all of a node_instance relationships.
-    :param basestring source_operation_name: the relationship operation name.
-    :param basestring interface_name: the name of the interface.
-    :param source_operation_name:
-    :param target_operation_name:
-    :param NodeInstance node: the source_node
-    :return:
-    """
-    sub_tasks = []
-    for relationship in node.outbound_relationships:
-        relationship_operations = relationship_tasks(
-            relationship,
-            interface_name,
-            source_operation_name=source_operation_name,
-            target_operation_name=target_operation_name,
-            **kwargs)
-        sub_tasks.append(relationship_operations)
-    return sub_tasks
-
-
-def relationship_tasks(relationship, interface_name, source_operation_name=None,
-                       target_operation_name=None, **kwargs):
-    """
-    Creates a relationship task source and target.
-    :param Relationship relationship: the relationship instance itself
-    :param source_operation_name:
-    :param target_operation_name:
-
-    :return:
-    """
-    operations = []
-    if source_operation_name:
-        try:
-            if _is_empty_task(relationship, interface_name, source_operation_name):
-                operations.append(StubTask())
-            else:
-                operations.append(
-                    OperationTask.for_relationship(relationship=relationship,
-                                                   interface_name=interface_name,
-                                                   operation_name=source_operation_name,
-                                                   **kwargs)
-                )
-        except exceptions.OperationNotFoundException:
-            # We will skip relationships which do not have the operation
-            pass
-    if target_operation_name:
-        try:
-            if _is_empty_task(relationship, interface_name, target_operation_name):
-                operations.append(StubTask())
-            else:
-                operations.append(
-                    OperationTask.for_relationship(relationship=relationship,
-                                                   interface_name=interface_name,
-                                                   operation_name=target_operation_name,
-                                                   **kwargs)
-                )
-        except exceptions.OperationNotFoundException:
-            # We will skip relationships which do not have the operation
-            pass
-
-    return operations
-
-
-def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
-    """
-    Creates dependencies between tasks if there is a relationship (outbound) between their nodes.
-    """
-
-    def get_task(node_name):
-        for task, node in tasks_and_nodes:
-            if node.name == node_name:
-                return task
-        return None
-
-    for task, node in tasks_and_nodes:
-        dependencies = []
-        for relationship in node.outbound_relationships:
-            dependency = get_task(relationship.target_node.name)
-            if dependency:
-                dependencies.append(dependency)
-        if dependencies:
-            if reverse:
-                for dependency in dependencies:
-                    graph.add_dependency(dependency, task)
-            else:
-                graph.add_dependency(task, dependencies)
-
-
-def _is_empty_task(actor, interface_name, operation_name):
-    interface = actor.interfaces.get(interface_name)
-    if interface:
-        operation = interface.operations.get(operation_name)
-        if operation:
-            return operation.implementation is None
-
-    raise exceptions.OperationNotFoundException(
-        'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
-        .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/builtin/workflows.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py
index 60f14ed..b286e98 100644
--- a/aria/orchestrator/workflows/builtin/workflows.py
+++ b/aria/orchestrator/workflows/builtin/workflows.py
@@ -18,10 +18,7 @@ TSOCA normative lifecycle workflows.
 """
 
 from ... import workflow
-from .utils import (
-    create_node_task,
-    create_relationships_tasks
-)
+from ..api import task
 
 
 NORMATIVE_STANDARD_INTERFACE = 'Standard' # 'tosca.interfaces.node.lifecycle.Standard'
@@ -72,19 +69,18 @@ __all__ = (
 @workflow(suffix_template='{node.name}')
 def install_node(graph, node, **kwargs):
     # Create
-    sequence = [create_node_task(node,
-                                 NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
+    sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CREATE)]
 
     # Configure
-    sequence += create_relationships_tasks(node,
-                                           NORMATIVE_CONFIGURE_INTERFACE,
-                                           NORMATIVE_PRE_CONFIGURE_SOURCE,
-                                           NORMATIVE_PRE_CONFIGURE_TARGET)
-    sequence.append(create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
-    sequence += create_relationships_tasks(node,
-                                           NORMATIVE_CONFIGURE_INTERFACE,
-                                           NORMATIVE_POST_CONFIGURE_SOURCE,
-                                           NORMATIVE_POST_CONFIGURE_TARGET)
+    sequence += task.create_relationships_tasks(node,
+                                                NORMATIVE_CONFIGURE_INTERFACE,
+                                                NORMATIVE_PRE_CONFIGURE_SOURCE,
+                                                NORMATIVE_PRE_CONFIGURE_TARGET)
+    sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_CONFIGURE))
+    sequence += task.create_relationships_tasks(node,
+                                                NORMATIVE_CONFIGURE_INTERFACE,
+                                                NORMATIVE_POST_CONFIGURE_SOURCE,
+                                                NORMATIVE_POST_CONFIGURE_TARGET)
     # Start
     sequence += _create_start_tasks(node)
 
@@ -97,9 +93,7 @@ def uninstall_node(graph, node, **kwargs):
     sequence = _create_stop_tasks(node)
 
     # Delete
-    sequence.append(create_node_task(node,
-                                     NORMATIVE_STANDARD_INTERFACE,
-                                     NORMATIVE_DELETE))
+    sequence.append(task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_DELETE))
 
     graph.sequence(*sequence)
 
@@ -115,16 +109,41 @@ def stop_node(graph, node, **kwargs):
 
 
 def _create_start_tasks(node):
-    sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
-    sequence += create_relationships_tasks(node,
-                                           NORMATIVE_CONFIGURE_INTERFACE,
-                                           NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET)
+    sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_START)]
+    sequence += task.create_relationships_tasks(node,
+                                                NORMATIVE_CONFIGURE_INTERFACE,
+                                                NORMATIVE_ADD_SOURCE, NORMATIVE_ADD_TARGET)
     return sequence
 
 
 def _create_stop_tasks(node):
-    sequence = [create_node_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
-    sequence += create_relationships_tasks(node,
-                                           NORMATIVE_CONFIGURE_INTERFACE,
-                                           NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET)
+    sequence = [task.create_task(node, NORMATIVE_STANDARD_INTERFACE, NORMATIVE_STOP)]
+    sequence += task.create_relationships_tasks(node,
+                                                NORMATIVE_CONFIGURE_INTERFACE,
+                                                NORMATIVE_REMOVE_SOURCE, NORMATIVE_REMOVE_TARGET)
     return sequence
+
+
+def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
+    """
+    Creates dependencies between tasks if there is a relationship (outbound) between their nodes.
+    """
+
+    def get_task(node_name):
+        for api_task, task_node in tasks_and_nodes:
+            if task_node.name == node_name:
+                return api_task
+        return None
+
+    for api_task, node in tasks_and_nodes:
+        dependencies = []
+        for relationship in node.outbound_relationships:
+            dependency = get_task(relationship.target_node.name)
+            if dependency:
+                dependencies.append(dependency)
+        if dependencies:
+            if reverse:
+                for dependency in dependencies:
+                    graph.add_dependency(dependency, api_task)
+            else:
+                graph.add_dependency(api_task, dependencies)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 155d0ee..fd0dd6d 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -44,7 +44,8 @@ class Engine(logger.LoggerMixin):
         self._execution_graph = networkx.DiGraph()
         self._executor = executor
         translation.build_execution_graph(task_graph=tasks_graph,
-                                          execution_graph=self._execution_graph)
+                                          execution_graph=self._execution_graph,
+                                          default_executor=self._executor)
 
     def execute(self):
         """
@@ -109,12 +110,11 @@ class Engine(logger.LoggerMixin):
                     self._workflow_context.model.task.refresh(task.model_task)
             yield task
 
-    def _handle_executable_task(self, task):
-        if isinstance(task, engine_task.StubTask):
-            task.status = models.Task.SUCCESS
-        else:
+    @staticmethod
+    def _handle_executable_task(task):
+        if isinstance(task, engine_task.OperationTask):
             events.sent_task_signal.send(task)
-            self._executor.execute(task)
+        task.execute()
 
     def _handle_ended_tasks(self, task):
         if task.status == models.Task.FAILED and not task.ignore_failure:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 88c24bd..f3e4e7e 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -40,7 +40,6 @@ def _task_started(task, *args, **kwargs):
     with task._update():
         task.started_at = datetime.utcnow()
         task.status = task.STARTED
-
         _update_node_state_if_necessary(task, is_transitional=True)
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 8adeb7e..0e081c2 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -47,9 +47,13 @@ class BaseTask(object):
     Base class for Task objects
     """
 
-    def __init__(self, id, *args, **kwargs):
+    def __init__(self, id, executor, *args, **kwargs):
         super(BaseTask, self).__init__(*args, **kwargs)
         self._id = id
+        self._executor = executor
+
+    def execute(self):
+        return self._executor.execute(self)
 
     @property
     def id(self):
@@ -61,8 +65,11 @@ class BaseTask(object):
 
 class StubTask(BaseTask):
     """
-    Base stub task for all tasks that don't actually run anything
+    Base stub task for marker user tasks that only mark the start/end of a workflow
+    or sub-workflow
     """
+    STARTED = models.Task.STARTED
+    SUCCESS = models.Task.SUCCESS
 
     def __init__(self, *args, **kwargs):
         super(StubTask, self).__init__(*args, **kwargs)
@@ -70,10 +77,10 @@ class StubTask(BaseTask):
         self.due_at = datetime.utcnow()
 
     def has_ended(self):
-        return self.status in (models.Task.SUCCESS, models.Task.FAILED)
+        return self.status == self.SUCCESS
 
     def is_waiting(self):
-        return self.status in (models.Task.PENDING, models.Task.RETRYING)
+        return not self.has_ended()
 
 
 class StartWorkflowTask(StubTask):
@@ -108,14 +115,14 @@ class OperationTask(BaseTask):
     """
     Operation task
     """
-
     def __init__(self, api_task, *args, **kwargs):
-        super(OperationTask, self).__init__(id=api_task.id, **kwargs)
+        # If no executor is provided, we infer that this is an empty task which does not need to be
+        # executed.
+        super(OperationTask, self).__init__(id=api_task.id, *args, **kwargs)
         self._workflow_context = api_task._workflow_context
         self.interface_name = api_task.interface_name
         self.operation_name = api_task.operation_name
         model_storage = api_task._workflow_context.model
-        plugin = api_task.plugin
 
         base_task_model = model_storage.task.model_cls
         if isinstance(api_task.actor, models.Node):
@@ -130,15 +137,18 @@ class OperationTask(BaseTask):
 
         task_model = create_task_model(
             name=api_task.name,
-            implementation=api_task.implementation,
             actor=api_task.actor,
-            inputs=api_task.inputs,
             status=base_task_model.PENDING,
             max_attempts=api_task.max_attempts,
             retry_interval=api_task.retry_interval,
             ignore_failure=api_task.ignore_failure,
-            plugin=plugin,
-            execution=self._workflow_context.execution
+            execution=self._workflow_context.execution,
+
+            # Only non-stub tasks have these fields
+            plugin=api_task.plugin,
+            implementation=api_task.implementation,
+            inputs=api_task.inputs
+
         )
         self._workflow_context.model.task.put(task_model)
 
@@ -153,6 +163,9 @@ class OperationTask(BaseTask):
         self._task_id = task_model.id
         self._update_fields = None
 
+    def execute(self):
+        super(OperationTask, self).execute()
+
     @contextmanager
     def _update(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index b6cbdad..0bbce90 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -18,12 +18,14 @@ Translation of user graph's API to the execution graph
 """
 
 from .. import api
+from ..executor import base
 from . import task as core_task
 
 
 def build_execution_graph(
         task_graph,
         execution_graph,
+        default_executor,
         start_cls=core_task.StartWorkflowTask,
         end_cls=core_task.EndWorkflowTask,
         depends_on=()):
@@ -37,31 +39,33 @@ def build_execution_graph(
     :param depends_on: internal use
     """
     # Insert start marker
-    start_task = start_cls(id=_start_graph_suffix(task_graph.id))
+    start_task = start_cls(id=_start_graph_suffix(task_graph.id), executor=base.StubTaskExecutor())
     _add_task_and_dependencies(execution_graph, start_task, depends_on)
 
     for api_task in task_graph.topological_order(reverse=True):
         dependencies = task_graph.get_dependencies(api_task)
         operation_dependencies = _get_tasks_from_dependencies(
-            execution_graph,
-            dependencies,
-            default=[start_task])
+            execution_graph, dependencies, default=[start_task])
 
         if isinstance(api_task, api.task.OperationTask):
-            # Add the task an the dependencies
-            operation_task = core_task.OperationTask(api_task)
+            if api_task.implementation:
+                operation_task = core_task.OperationTask(api_task, executor=default_executor)
+            else:
+                operation_task = core_task.OperationTask(api_task,
+                                                         executor=base.EmptyOperationExecutor())
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
             build_execution_graph(
                 task_graph=api_task,
                 execution_graph=execution_graph,
+                default_executor=default_executor,
                 start_cls=core_task.StartSubWorkflowTask,
                 end_cls=core_task.EndSubWorkflowTask,
                 depends_on=operation_dependencies
             )
         elif isinstance(api_task, api.task.StubTask):
-            stub_task = core_task.StubTask(id=api_task.id)
+            stub_task = core_task.StubTask(id=api_task.id, executor=base.StubTaskExecutor())
             _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
         else:
             raise RuntimeError('Undefined state')
@@ -71,7 +75,7 @@ def build_execution_graph(
         execution_graph,
         _get_non_dependency_tasks(task_graph),
         default=[start_task])
-    end_task = end_cls(id=_end_graph_suffix(task_graph.id))
+    end_task = end_cls(id=_end_graph_suffix(task_graph.id), executor=base.StubTaskExecutor())
     _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies)
 
 
@@ -85,11 +89,14 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
     """
     Returns task list from dependencies.
     """
-    return [execution_graph.node[dependency.id
-                                 if isinstance(dependency, (api.task.OperationTask,
-                                                            api.task.StubTask))
-                                 else _end_graph_suffix(dependency.id)]['task']
-            for dependency in dependencies] or default
+    tasks = []
+    for dependency in dependencies:
+        if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)):
+            dependency_id = dependency.id
+        else:
+            dependency_id = _end_graph_suffix(dependency.id)
+        tasks.append(execution_graph.node[dependency_id]['task'])
+    return tasks or default
 
 
 def _start_graph_suffix(id):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index 3ffe18b..236a55f 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -35,12 +35,21 @@ def _get_task_name(task):
 
 @events.start_task_signal.connect
 def _start_task_handler(task, **kwargs):
-    task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...'
-                             .format(name=_get_task_name(task), task=task))
+    # If the task has not implementation this is an empty task.
+    if task.implementation:
+        suffix = 'started...'
+        logger = task.context.logger.info
+    else:
+        suffix = 'has no implementation'
+        logger = task.context.logger.debug
 
+    logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
+        name=_get_task_name(task), task=task, suffix=suffix))
 
 @events.on_success_task_signal.connect
 def _success_task_handler(task, **kwargs):
+    if not task.implementation:
+        return
     task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
                              .format(name=_get_task_name(task), task=task))
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 39becef..a225837 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -50,3 +50,14 @@ class BaseExecutor(logger.LoggerMixin):
     @staticmethod
     def _task_succeeded(task):
         events.on_success_task_signal.send(task)
+
+
+class StubTaskExecutor(BaseExecutor):
+    def execute(self, task):
+        task.status = task.SUCCESS
+
+
+class EmptyOperationExecutor(BaseExecutor):
+    def execute(self, task):
+        events.start_task_signal.send(task)
+        events.on_success_task_signal.send(task)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index e1261bb..eb70a41 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -16,7 +16,6 @@
 """
 Dry executor
 """
-
 from datetime import datetime
 
 from .base import BaseExecutor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
----------------------------------------------------------------------
diff --git a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
index ff6ba6c..1e83ef9 100644
--- a/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
+++ b/extensions/aria_extension_tosca/profiles/tosca-simple-1.0/interfaces.yaml
@@ -100,3 +100,8 @@ interface_types:
         Operation to remove a target node.
       _extensions:
         relationship_edge: source
+    remove_source:
+      description: >-
+        Operation to remove the source node.
+      _extensions:
+        relationship_edge: target

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/end2end/test_hello_world.py
----------------------------------------------------------------------
diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py
index 09e5d06..fc5f631 100644
--- a/tests/end2end/test_hello_world.py
+++ b/tests/end2end/test_hello_world.py
@@ -29,8 +29,7 @@ def test_hello_world(testenv):
     finally:
         # Even if some assertions failed, attempt to execute uninstall so the
         # webserver process doesn't stay up once the test is finished
-        # TODO: remove force_service_delete=True
-        testenv.uninstall_service(force_service_delete=True)
+        testenv.uninstall_service()
 
     _verify_webserver_down('http://localhost:9090')
     testenv.verify_clean_storage()
@@ -57,5 +56,5 @@ def _verify_deployed_service_in_storage(service_name, model_storage):
     assert service.name == service_name
     assert len(service.executions) == 1
     assert len(service.nodes) == 2
-    # TODO: validate node states
+    assert all(node.state == node.STARTED for node in service.nodes.values())
     assert len(service.executions[0].logs) > 0

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/end2end/testenv.py
----------------------------------------------------------------------
diff --git a/tests/end2end/testenv.py b/tests/end2end/testenv.py
index 3950b20..85714e5 100644
--- a/tests/end2end/testenv.py
+++ b/tests/end2end/testenv.py
@@ -70,7 +70,7 @@ class TestEnvironment(object):
         assert len(self.model_storage.log.list()) == 0
 
     def _get_cli(self):
-        cli = sh.aria.bake(_out=sys.stdout.write, _err=sys.stderr.write)
+        cli = sh.aria.bake('-vvv', _out=sys.stdout.write, _err=sys.stderr.write)
 
         # the `sh` library supports underscore-dash auto-replacement for commands and option flags
         # yet not for subcommands (e.g. `aria service-templates`); The following class fixes this.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index c399474..971e0db 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -84,10 +84,10 @@ def test_node_operation_task_execution(ctx, thread_executor):
     @workflow
     def basic_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.for_node(
+            api.task.OperationTask(
+                node,
                 interface_name=interface_name,
                 operation_name=operation_name,
-                node=node,
                 inputs=inputs
             )
         )
@@ -141,8 +141,8 @@ def test_relationship_operation_task_execution(ctx, thread_executor):
     @workflow
     def basic_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.for_relationship(
-                relationship=relationship,
+            api.task.OperationTask(
+                relationship,
                 interface_name=interface_name,
                 operation_name=operation_name,
                 inputs=inputs
@@ -209,9 +209,10 @@ def test_invalid_task_operation_id(ctx, thread_executor):
     @workflow
     def basic_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.for_node(node=node,
-                                            interface_name=interface_name,
-                                            operation_name=operation_name)
+            api.task.OperationTask(
+                node,
+                interface_name=interface_name,
+                operation_name=operation_name)
         )
 
     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
@@ -250,10 +251,11 @@ def test_plugin_workdir(ctx, thread_executor, tmpdir):
 
     @workflow
     def basic_workflow(graph, **_):
-        graph.add_tasks(api.task.OperationTask.for_node(node=node,
-                                                        interface_name=interface_name,
-                                                        operation_name=operation_name,
-                                                        inputs=inputs))
+        graph.add_tasks(api.task.OperationTask(
+            node,
+            interface_name=interface_name,
+            operation_name=operation_name,
+            inputs=inputs))
 
     execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
     expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service.id),
@@ -298,10 +300,10 @@ def test_node_operation_logging(ctx, executor):
     @workflow
     def basic_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.for_node(
+            api.task.OperationTask(
+                node,
                 interface_name=interface_name,
                 operation_name=operation_name,
-                node=node,
                 inputs=inputs
             )
         )
@@ -331,10 +333,10 @@ def test_relationship_operation_logging(ctx, executor):
     @workflow
     def basic_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.for_relationship(
+            api.task.OperationTask(
+                relationship,
                 interface_name=interface_name,
                 operation_name=operation_name,
-                relationship=relationship,
                 inputs=inputs
             )
         )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index f4acc36..8a5db6f 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -51,7 +51,7 @@ def _mock_workflow(ctx, graph):
                               plugin=plugin)
     )
     node.interfaces[interface.name] = interface
-    task = api.task.OperationTask.for_node(node=node, interface_name='test', operation_name='op')
+    task = api.task.OperationTask(node, interface_name='test', operation_name='op')
     graph.add_tasks(task)
     return graph
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index 213d964..ecc3ac2 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -90,8 +90,8 @@ def test_host_ip(workflow_context, executor):
     @workflow
     def basic_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.for_node(
-                node=dependency_node,
+            api.task.OperationTask(
+                dependency_node,
                 interface_name=interface_name,
                 operation_name=operation_name,
                 inputs=inputs
@@ -121,8 +121,8 @@ def test_relationship_tool_belt(workflow_context, executor):
     @workflow
     def basic_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.for_relationship(
-                relationship=relationship,
+            api.task.OperationTask(
+                relationship,
                 interface_name=interface_name,
                 operation_name=operation_name,
                 inputs=inputs

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 58506ba..09d0499 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -489,8 +489,8 @@ if __name__ == '__main__':
                     inputs=inputs)
             )
             node.interfaces[interface.name] = interface
-            graph.add_tasks(api.task.OperationTask.for_node(
-                node=node,
+            graph.add_tasks(api.task.OperationTask(
+                node,
                 interface_name='test',
                 operation_name='op',
                 inputs=inputs))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index a75d59a..a9dc5e8 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -245,8 +245,8 @@ class TestWithActualSSHServer(object):
             for test_operation in test_operations:
                 op_inputs = inputs.copy()
                 op_inputs['test_operation'] = test_operation
-                ops.append(api.task.OperationTask.for_node(
-                    node=node,
+                ops.append(api.task.OperationTask(
+                    node,
                     interface_name='test',
                     operation_name='op',
                     inputs=op_inputs))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/api/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/api/test_task.py b/tests/orchestrator/workflows/api/test_task.py
index ab62361..642c785 100644
--- a/tests/orchestrator/workflows/api/test_task.py
+++ b/tests/orchestrator/workflows/api/test_task.py
@@ -62,8 +62,8 @@ class TestOperationTask(object):
         ignore_failure = True
 
         with context.workflow.current.push(ctx):
-            api_task = api.task.OperationTask.for_node(
-                node=node,
+            api_task = api.task.OperationTask(
+                node,
                 interface_name=interface_name,
                 operation_name=operation_name,
                 inputs=inputs,
@@ -109,8 +109,8 @@ class TestOperationTask(object):
         retry_interval = 10
 
         with context.workflow.current.push(ctx):
-            api_task = api.task.OperationTask.for_relationship(
-                relationship=relationship,
+            api_task = api.task.OperationTask(
+                relationship,
                 interface_name=interface_name,
                 operation_name=operation_name,
                 inputs=inputs,
@@ -154,8 +154,8 @@ class TestOperationTask(object):
         retry_interval = 10
 
         with context.workflow.current.push(ctx):
-            api_task = api.task.OperationTask.for_relationship(
-                relationship=relationship,
+            api_task = api.task.OperationTask(
+                relationship,
                 interface_name=interface_name,
                 operation_name=operation_name,
                 inputs=inputs,
@@ -193,8 +193,8 @@ class TestOperationTask(object):
         dependency_node.interfaces[interface_name] = interface
 
         with context.workflow.current.push(ctx):
-            task = api.task.OperationTask.for_node(
-                node=dependency_node,
+            task = api.task.OperationTask(
+                dependency_node,
                 interface_name=interface_name,
                 operation_name=operation_name)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index af9af17..8c0705b 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -55,34 +55,32 @@ class BaseTest(object):
                              tasks_graph=graph)
 
     @staticmethod
-    def _op(func, ctx,
+    def _op(ctx,
+            func,
             inputs=None,
             max_attempts=None,
             retry_interval=None,
             ignore_failure=None):
         node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-
+        interface_name = 'aria.interfaces.lifecycle'
         operation_kwargs = dict(implementation='{name}.{func.__name__}'.format(
             name=__name__, func=func))
         if inputs:
             # the operation has to declare the inputs before those may be passed
             operation_kwargs['inputs'] = inputs
-
-        interface = mock.models.create_interface(
-            node.service,
-            'aria.interfaces.lifecycle',
-            'create',
-            operation_kwargs=operation_kwargs
-        )
+        operation_name = 'create'
+        interface = mock.models.create_interface(node.service, interface_name, operation_name,
+                                                 operation_kwargs=operation_kwargs)
         node.interfaces[interface.name] = interface
-        return api.task.OperationTask.for_node(
-            node=node,
+
+        return api.task.OperationTask(
+            node,
             interface_name='aria.interfaces.lifecycle',
-            operation_name='create',
-            inputs=inputs,
+            operation_name=operation_name,
+            inputs=inputs or {},
             max_attempts=max_attempts,
             retry_interval=retry_interval,
-            ignore_failure=ignore_failure
+            ignore_failure=ignore_failure,
         )
 
     @pytest.fixture(autouse=True)
@@ -162,7 +160,7 @@ class TestEngine(BaseTest):
     def test_single_task_successful_execution(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(mock_success_task, ctx))
+            graph.add_tasks(self._op(ctx, func=mock_success_task))
         self._execute(
             workflow_func=mock_workflow,
             workflow_context=workflow_context,
@@ -174,7 +172,7 @@ class TestEngine(BaseTest):
     def test_single_task_failed_execution(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(mock_failed_task, ctx))
+            graph.add_tasks(self._op(ctx, func=mock_failed_task))
         with pytest.raises(exceptions.ExecutorException):
             self._execute(
                 workflow_func=mock_workflow,
@@ -191,8 +189,8 @@ class TestEngine(BaseTest):
     def test_two_tasks_execution_order(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
-            op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
+            op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1})
+            op2 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2})
             graph.sequence(op1, op2)
         self._execute(
             workflow_func=mock_workflow,
@@ -206,9 +204,9 @@ class TestEngine(BaseTest):
     def test_stub_and_subworkflow_execution(self, workflow_context, executor):
         @workflow
         def sub_workflow(ctx, graph):
-            op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
+            op1 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 1})
             op2 = api.task.StubTask()
-            op3 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
+            op3 = self._op(ctx, func=mock_ordered_task, inputs={'counter': 2})
             graph.sequence(op1, op2, op3)
 
         @workflow
@@ -231,7 +229,7 @@ class TestCancel(BaseTest):
         @workflow
         def mock_workflow(ctx, graph):
             operations = (
-                self._op(mock_sleep_task, ctx, inputs=dict(seconds=0.1))
+                self._op(ctx, func=mock_sleep_task, inputs=dict(seconds=0.1))
                 for _ in range(number_of_tasks)
             )
             return graph.sequence(*operations)
@@ -271,7 +269,7 @@ class TestRetries(BaseTest):
     def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_conditional_failure_task, ctx,
+            op = self._op(ctx, func=mock_conditional_failure_task,
                           inputs={'failure_count': 1},
                           max_attempts=2)
             graph.add_tasks(op)
@@ -287,7 +285,7 @@ class TestRetries(BaseTest):
     def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_conditional_failure_task, ctx,
+            op = self._op(ctx, func=mock_conditional_failure_task,
                           inputs={'failure_count': 2},
                           max_attempts=2)
             graph.add_tasks(op)
@@ -304,7 +302,7 @@ class TestRetries(BaseTest):
     def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_conditional_failure_task, ctx,
+            op = self._op(ctx, func=mock_conditional_failure_task,
                           inputs={'failure_count': 1},
                           max_attempts=3)
             graph.add_tasks(op)
@@ -320,7 +318,7 @@ class TestRetries(BaseTest):
     def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_conditional_failure_task, ctx,
+            op = self._op(ctx, func=mock_conditional_failure_task,
                           inputs={'failure_count': 2},
                           max_attempts=3)
             graph.add_tasks(op)
@@ -336,7 +334,7 @@ class TestRetries(BaseTest):
     def test_infinite_retries(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_conditional_failure_task, ctx,
+            op = self._op(ctx, func=mock_conditional_failure_task,
                           inputs={'failure_count': 1},
                           max_attempts=-1)
             graph.add_tasks(op)
@@ -362,7 +360,7 @@ class TestRetries(BaseTest):
     def _test_retry_interval(self, retry_interval, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_conditional_failure_task, ctx,
+            op = self._op(ctx, func=mock_conditional_failure_task,
                           inputs={'failure_count': 1},
                           max_attempts=2,
                           retry_interval=retry_interval)
@@ -382,7 +380,7 @@ class TestRetries(BaseTest):
     def test_ignore_failure(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_conditional_failure_task, ctx,
+            op = self._op(ctx, func=mock_conditional_failure_task,
                           ignore_failure=True,
                           inputs={'failure_count': 100},
                           max_attempts=100)
@@ -406,7 +404,7 @@ class TestTaskRetryAndAbort(BaseTest):
 
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_task_retry, ctx,
+            op = self._op(ctx, func=mock_task_retry,
                           inputs={'message': self.message},
                           retry_interval=default_retry_interval,
                           max_attempts=2)
@@ -430,7 +428,7 @@ class TestTaskRetryAndAbort(BaseTest):
 
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_task_retry, ctx,
+            op = self._op(ctx, func=mock_task_retry,
                           inputs={'message': self.message,
                                   'retry_interval': custom_retry_interval},
                           retry_interval=default_retry_interval,
@@ -453,7 +451,7 @@ class TestTaskRetryAndAbort(BaseTest):
     def test_task_abort(self, workflow_context, executor):
         @workflow
         def mock_workflow(ctx, graph):
-            op = self._op(mock_task_abort, ctx,
+            op = self._op(ctx, func=mock_task_abort,
                           inputs={'message': self.message},
                           retry_interval=100,
                           max_attempts=100)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index b9bff77..184071d 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -136,8 +136,8 @@ def _assert_node_state_changed_as_a_result_of_standard_lifecycle_operation(node,
 
 @workflow
 def single_operation_workflow(graph, node, interface_name, op_name, **_):
-    graph.add_tasks(api.task.OperationTask.for_node(
-        node=node,
+    graph.add_tasks(api.task.OperationTask(
+        node,
         interface_name=interface_name,
         operation_name=op_name))
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index 0765350..748ee20 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -24,6 +24,7 @@ from aria.orchestrator.workflows import (
     api,
     core,
     exceptions,
+    executor
 )
 
 from tests import mock, storage
@@ -66,20 +67,22 @@ class TestOperationTask(object):
 
     def _create_node_operation_task(self, ctx, node):
         with workflow_context.current.push(ctx):
-            api_task = api.task.OperationTask.for_node(
-                node=node,
+            api_task = api.task.OperationTask(
+                node,
                 interface_name=NODE_INTERFACE_NAME,
                 operation_name=NODE_OPERATION_NAME)
-            core_task = core.task.OperationTask(api_task=api_task)
+            core_task = core.task.OperationTask(api_task=api_task,
+                                                executor=executor.base.EmptyOperationExecutor())
         return api_task, core_task
 
     def _create_relationship_operation_task(self, ctx, relationship):
         with workflow_context.current.push(ctx):
-            api_task = api.task.OperationTask.for_relationship(
-                relationship=relationship,
+            api_task = api.task.OperationTask(
+                relationship,
                 interface_name=RELATIONSHIP_INTERFACE_NAME,
                 operation_name=RELATIONSHIP_OPERATION_NAME)
-            core_task = core.task.OperationTask(api_task=api_task)
+            core_task = core.task.OperationTask(api_task=api_task,
+                                                executor=executor.base.EmptyOperationExecutor())
         return api_task, core_task
 
     def test_node_operation_task_creation(self, ctx):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 514bce9..2a96d01 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -17,6 +17,7 @@ from networkx import topological_sort, DiGraph
 
 from aria.orchestrator import context
 from aria.orchestrator.workflows import api, core
+from aria.orchestrator.workflows.executor import base
 
 from tests import mock
 from tests import storage
@@ -41,17 +42,20 @@ def test_task_graph_into_execution_graph(tmpdir):
 
     with context.workflow.current.push(task_context):
         test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
-        simple_before_task = api.task.OperationTask.for_node(node=node,
-                                                             interface_name=interface_name,
-                                                             operation_name=operation_name)
-        simple_after_task = api.task.OperationTask.for_node(node=node,
-                                                            interface_name=interface_name,
-                                                            operation_name=operation_name)
+        simple_before_task = api.task.OperationTask(
+            node,
+            interface_name=interface_name,
+            operation_name=operation_name)
+        simple_after_task = api.task.OperationTask(
+            node,
+            interface_name=interface_name,
+            operation_name=operation_name)
 
         inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph')
-        inner_task = api.task.OperationTask.for_node(node=node,
-                                                     interface_name=interface_name,
-                                                     operation_name=operation_name)
+        inner_task = api.task.OperationTask(
+            node,
+            interface_name=interface_name,
+            operation_name=operation_name)
         inner_task_graph.add_tasks(inner_task)
 
     test_task_graph.add_tasks(simple_before_task)
@@ -63,7 +67,8 @@ def test_task_graph_into_execution_graph(tmpdir):
     # Direct check
     execution_graph = DiGraph()
     core.translation.build_execution_graph(task_graph=test_task_graph,
-                                           execution_graph=execution_graph)
+                                           execution_graph=execution_graph,
+                                           default_executor=base.StubTaskExecutor())
     execution_tasks = topological_sort(execution_graph)
 
     assert len(execution_tasks) == 7

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 88e7ae0..1dbfae1 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -99,14 +99,16 @@ def _test(context, executor, lock_files, func, expected_failure):
     @workflow
     def mock_workflow(graph, **_):
         graph.add_tasks(
-            api.task.OperationTask.for_node(node=node,
-                                            interface_name=interface_name,
-                                            operation_name=operation_name,
-                                            inputs=inputs),
-            api.task.OperationTask.for_node(node=node,
-                                            interface_name=interface_name,
-                                            operation_name=operation_name,
-                                            inputs=inputs)
+            api.task.OperationTask(
+                node,
+                interface_name=interface_name,
+                operation_name=operation_name,
+                inputs=inputs),
+            api.task.OperationTask(
+                node,
+                interface_name=interface_name,
+                operation_name=operation_name,
+                inputs=inputs)
         )
 
     signal = events.on_failure_task_signal

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 7ae337d..878ac24 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -46,10 +46,11 @@ def test_decorate_extension(context, executor):
                                   inputs=inputs)
         )
         node.interfaces[interface.name] = interface
-        task = api.task.OperationTask.for_node(node=node,
-                                               interface_name=interface_name,
-                                               operation_name=operation_name,
-                                               inputs=inputs)
+        task = api.task.OperationTask(
+            node,
+            interface_name=interface_name,
+            operation_name=operation_name,
+            inputs=inputs)
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 3a8c54b..4fbe9c1 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -99,10 +99,11 @@ def _run_workflow(context, executor, op_func, inputs=None):
                                   inputs=wf_inputs)
         )
         node.interfaces[interface.name] = interface
-        task = api.task.OperationTask.for_node(node=node,
-                                               interface_name=interface_name,
-                                               operation_name=operation_name,
-                                               inputs=wf_inputs)
+        task = api.task.OperationTask(
+            node,
+            interface_name=interface_name,
+            operation_name=operation_name,
+            inputs=wf_inputs)
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8ca3ff29/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py
----------------------------------------------------------------------
diff --git a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py
index abe1ee2..06e4f9e 100644
--- a/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py
+++ b/tests/resources/service-templates/tosca-simple-1.0/node-cellar/workflows.py
@@ -1,5 +1,5 @@
 from aria import workflow
-from aria.orchestrator.workflows.builtin import utils
+from aria.orchestrator.workflows.api import task
 from aria.orchestrator.workflows.exceptions import TaskException
 
 
@@ -16,9 +16,9 @@ def maintenance(ctx, graph, enabled):
 
     for node in ctx.model.node.iter():
         try:
-            graph.add_tasks(utils.create_node_task(node=node,
-                                                   interface_name=INTERFACE_NAME,
-                                                   operation_name=ENABLE_OPERATION_NAME if enabled
-                                                   else DISABLE_OPERATION_NAME))
+            graph.add_tasks(task.OperationTask(node,
+                                               interface_name=INTERFACE_NAME,
+                                               operation_name=ENABLE_OPERATION_NAME if enabled
+                                               else DISABLE_OPERATION_NAME))
         except TaskException:
             pass


Mime
View raw message