ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: wip
Date Sun, 11 Jun 2017 16:05:39 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 5afa2f7fe -> f36fe86c1


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/f36fe86c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/f36fe86c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/f36fe86c

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: f36fe86c17207a1349821ec599b349d94d5b2d6a
Parents: 5afa2f7
Author: max-orlov <maxim@gigaspaces.com>
Authored: Sun Jun 11 19:05:35 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Sun Jun 11 19:05:35 2017 +0300

----------------------------------------------------------------------
 aria/modeling/mixins.py                         |   4 +-
 aria/modeling/orchestration.py                  | 176 +++++--
 aria/orchestrator/workflows/core/__init__.py    |   2 +-
 aria/orchestrator/workflows/core/_task.py       | 265 ++++++++++
 aria/orchestrator/workflows/core/engine.py      |  20 +-
 .../workflows/core/events_handler.py            |  11 +-
 aria/orchestrator/workflows/core/task.py        | 269 ----------
 aria/orchestrator/workflows/core/translation.py |  55 +-
 aria/orchestrator/workflows/executor/thread.py  |   1 +
 tests/modeling/test_mixins.py                   |   8 +
 .../orchestrator/workflows/core/_test_engine.py | 519 ++++++++++++++++++
 .../orchestrator/workflows/core/test_engine.py  | 520 -------------------
 .../test_task_graph_into_execution_graph.py     |  37 +-
 13 files changed, 1002 insertions(+), 885 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/aria/modeling/mixins.py
----------------------------------------------------------------------
diff --git a/aria/modeling/mixins.py b/aria/modeling/mixins.py
index c98a866..31675fe 100644
--- a/aria/modeling/mixins.py
+++ b/aria/modeling/mixins.py
@@ -18,14 +18,12 @@ classes:
     * ModelMixin - abstract model implementation.
     * ModelIDMixin - abstract model implementation with IDs.
 """
-
 from sqlalchemy.ext import associationproxy
 from sqlalchemy import (
     Column,
     Integer,
     Text,
-    PickleType
-)
+    PickleType)
 
 from ..parser.consumption import ConsumptionContext
 from ..utils import console, collections, caching, formatting

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 995c8c2..36f1421 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -21,7 +21,6 @@ classes:
 """
 
 # pylint: disable=no-self-argument, no-member, abstract-method
-
 from datetime import datetime
 
 from sqlalchemy import (
@@ -34,19 +33,19 @@ from sqlalchemy import (
     String,
     Float,
     orm,
-)
+    PickleType)
 from sqlalchemy.ext.associationproxy import association_proxy
 from sqlalchemy.ext.declarative import declared_attr
 
 from ..orchestrator.exceptions import (TaskAbortException, TaskRetryException)
-from .mixins import ModelMixin, ParameterMixin
+from . import mixins
 from . import (
     relationship,
     types as modeling_types
 )
 
 
-class ExecutionBase(ModelMixin):
+class ExecutionBase(mixins.ModelMixin):
     """
     Execution model representation.
     """
@@ -152,7 +151,7 @@ class ExecutionBase(ModelMixin):
         )
 
 
-class PluginBase(ModelMixin):
+class PluginBase(mixins.ModelMixin):
     """
     An installed plugin.
 
@@ -213,7 +212,7 @@ class PluginBase(ModelMixin):
     uploaded_at = Column(DateTime, nullable=False, index=True)
 
 
-class TaskBase(ModelMixin):
+class TaskBase(mixins.ModelMixin):
     """
     Represents the smallest unit of stateful execution in ARIA. The task state includes inputs,
     outputs, as well as an atomic status, ensuring that the task can only be running once at any
@@ -257,10 +256,25 @@ class TaskBase(ModelMixin):
 
     __tablename__ = 'task'
 
-    __private_fields__ = ['node_fk',
-                          'relationship_fk',
-                          'plugin_fk',
-                          'execution_fk']
+    __private_fields__ =  ['dependency_operation_task_fk', 'dependency_stub_task_fk', 'node_fk',
+                           'relationship_fk', 'plugin_fk', 'execution_fk']
+
+
+    START_WORKFLOW = 'start_workflow'
+    END_WORKFLOW = 'end_workflow'
+    START_SUBWROFKLOW = 'start_subworkflow'
+    END_SUBWORKFLOW = 'end_subworkflow'
+    STUB = 'stub'
+    CONDITIONAL = 'conditional'
+
+    STUB_TYPES = (
+        START_WORKFLOW,
+        START_SUBWROFKLOW,
+        END_WORKFLOW,
+        END_SUBWORKFLOW,
+        STUB,
+        CONDITIONAL,
+    )
 
     PENDING = 'pending'
     RETRYING = 'retrying'
@@ -276,10 +290,45 @@ class TaskBase(ModelMixin):
         SUCCESS,
         FAILED,
     )
-
     INFINITE_RETRIES = -1
 
     @declared_attr
+    def execution(cls):
+        return relationship.many_to_one(cls, 'execution')
+
+    @declared_attr
+    def execution_fk(cls):
+        return relationship.foreign_key('execution', nullable=True)
+
+    status = Column(Enum(*STATES, name='status'), default=PENDING)
+    due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow())
+    started_at = Column(DateTime, default=None)
+    ended_at = Column(DateTime, default=None)
+    attempts_count = Column(Integer, default=1)
+    api_id = Column(String)
+
+    _executor = Column(PickleType)
+    _executor_kwargs = Column(PickleType, default=None)
+
+    _context = Column(PickleType)
+    _context_kwargs = Column(PickleType, default=None)
+
+    @property
+    def executor(self):
+        return self._executor(**(self._executor_kwargs or {}))
+
+    @property
+    def context(self):
+        return self._context.instantiate_from_dict(task_id=self.id, **self._context_kwargs)
+
+    def execute(self):
+        executor = self.executor
+        try:
+            return executor.execute(self)
+        finally:
+            executor.close()
+
+    @declared_attr
     def logs(cls):
         return relationship.one_to_many(cls, 'log')
 
@@ -296,10 +345,6 @@ class TaskBase(ModelMixin):
         return relationship.many_to_one(cls, 'plugin')
 
     @declared_attr
-    def execution(cls):
-        return relationship.many_to_one(cls, 'execution')
-
-    @declared_attr
     def arguments(cls):
         return relationship.one_to_many(cls, 'argument', dict_key='name')
 
@@ -307,19 +352,10 @@ class TaskBase(ModelMixin):
     max_attempts = Column(Integer, default=1)
     retry_interval = Column(Float, default=0)
     ignore_failure = Column(Boolean, default=False)
+    interface_name = Column(String)
+    operation_name = Column(String)
 
-    # State
-    status = Column(Enum(*STATES, name='status'), default=PENDING)
-    due_at = Column(DateTime, nullable=False, index=True, default=datetime.utcnow())
-    started_at = Column(DateTime, default=None)
-    ended_at = Column(DateTime, default=None)
-    attempts_count = Column(Integer, default=1)
-
-    def has_ended(self):
-        return self.status in (self.SUCCESS, self.FAILED)
-
-    def is_waiting(self):
-        return self.status in (self.PENDING, self.RETRYING)
+    stub_type = Column(Enum(*STUB_TYPES))
 
     @property
     def actor(self):
@@ -351,10 +387,6 @@ class TaskBase(ModelMixin):
     def plugin_fk(cls):
         return relationship.foreign_key('plugin', nullable=True)
 
-    @declared_attr
-    def execution_fk(cls):
-        return relationship.foreign_key('execution', nullable=True)
-
     # endregion
 
     # region association proxies
@@ -376,14 +408,6 @@ class TaskBase(ModelMixin):
 
     # endregion
 
-    @classmethod
-    def for_node(cls, actor, **kwargs):
-        return cls(node=actor, **kwargs)
-
-    @classmethod
-    def for_relationship(cls, actor, **kwargs):
-        return cls(relationship=actor, **kwargs)
-
     @staticmethod
     def abort(message=None):
         raise TaskAbortException(message)
@@ -392,8 +416,78 @@ class TaskBase(ModelMixin):
     def retry(message=None, retry_interval=None):
         raise TaskRetryException(message, retry_interval=retry_interval)
 
+    @declared_attr
+    def operation_task_dependency_fk(cls):
+        """For Type one-to-many to Type"""
+        return relationship.foreign_key('task', nullable=True)
+
+    @declared_attr
+    def dependent_tasks(cls):
+        return relationship.one_to_many_self(cls, 'operation_task_dependency_fk')
+
+    def has_ended(self):
+        if self.stub_type is not None:
+            return self.status == self.SUCCESS
+        else:
+            return self.status in (self.SUCCESS, self.FAILED)
 
-class LogBase(ModelMixin):
+    def is_waiting(self):
+        if self.stub_type:
+            return not self.has_ended()
+        else:
+            return self.status in (self.PENDING, self.RETRYING)
+
+    @classmethod
+    def from_api_task(cls, api_task, executor, executor_kwargs=None):
+        from aria.modeling import models
+        from aria.orchestrator import context
+
+        instantiation_kwargs = {'_executor': executor,
+                                '_executor_kwargs': executor_kwargs}
+
+        if isinstance(api_task.actor, models.Node):
+            context_cls = context.operation.NodeOperationContext
+            instantiation_kwargs['node'] = api_task.actor
+        elif isinstance(api_task.actor, models.Relationship):
+            context_cls = context.operation.RelationshipOperationContext
+            instantiation_kwargs['relationship'] = api_task.actor
+        else:
+            raise RuntimeError('No operation context could be created for {actor.model_cls}'
+                               .format(actor=api_task.actor))
+
+        instantiation_kwargs.update(
+            {
+                'name': api_task.name,
+                'status': cls.PENDING,
+                'max_attempts': api_task.max_attempts,
+                'retry_interval': api_task.retry_interval,
+                'ignore_failure': api_task.ignore_failure,
+                'execution': api_task._workflow_context.execution,
+                'interface_name': api_task.interface_name,
+                'operation_name': api_task.operation_name,
+
+                # Only non-stub tasks have these fields
+                'plugin': api_task.plugin,
+                'function': api_task.function,
+                'arguments': api_task.arguments,
+                'api_id': api_task.id,
+
+                '_context': context_cls,
+                '_context_kwargs': {
+                    'name': api_task.name,
+                    'model_storage': api_task._workflow_context.model.serialization_dict,
+                    'resource_storage': api_task._workflow_context.resource.serialization_dict,
+                    'service_id': api_task._workflow_context._service_id,
+                    'actor_id': api_task.id,
+                    'execution_id': api_task._workflow_context._execution_id,
+                    'workdir': api_task._workflow_context._workdir
+                }
+        })
+
+        return cls(**instantiation_kwargs)
+
+
+class LogBase(mixins.ModelMixin):
 
     __tablename__ = 'log'
 
@@ -435,7 +529,7 @@ class LogBase(ModelMixin):
         return '{name}: {self.msg}'.format(name=name, self=self)
 
 
-class ArgumentBase(ParameterMixin):
+class ArgumentBase(mixins.ParameterMixin):
 
     __tablename__ = 'argument'
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/aria/orchestrator/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/__init__.py b/aria/orchestrator/workflows/core/__init__.py
index e377153..98938be 100644
--- a/aria/orchestrator/workflows/core/__init__.py
+++ b/aria/orchestrator/workflows/core/__init__.py
@@ -17,4 +17,4 @@
 Core for the workflow execution mechanism
 """
 
-from . import task, translation, engine
+from . import translation, engine

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/aria/orchestrator/workflows/core/_task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/_task.py b/aria/orchestrator/workflows/core/_task.py
new file mode 100644
index 0000000..55db694
--- /dev/null
+++ b/aria/orchestrator/workflows/core/_task.py
@@ -0,0 +1,265 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow tasks
+"""
+
+from contextlib import contextmanager
+from datetime import datetime
+from functools import (
+    partial,
+    wraps,
+)
+
+
+from ....modeling import models
+from ...context import operation as operation_context
+from .. import exceptions
+
+
+def _locked(func=None):
+    if func is None:
+        return partial(_locked, func=_locked)
+
+    @wraps(func)
+    def _wrapper(self, value, **kwargs):
+        if self._update_fields is None:
+            raise exceptions.TaskException('Task is not in update mode')
+        return func(self, value, **kwargs)
+    return _wrapper
+
+
+class BaseTask(object):
+    """
+    Base class for Task objects
+    """
+
+    def __init__(self, id, executor, *args, **kwargs):
+        super(BaseTask, self).__init__(*args, **kwargs)
+        self._id = id
+        self._executor = executor
+
+    def execute(self):
+        return self._executor.execute(self)
+
+    @property
+    def id(self):
+        """
+        :return: the task's id
+        """
+        return self._id
+
+
+class StubTask(BaseTask):
+    """
+    Base stub task for marker user tasks that only mark the start/end of a workflow
+    or sub-workflow
+    """
+    STARTED = models.Task.STARTED
+    SUCCESS = models.Task.SUCCESS
+
+    def __init__(self, *args, **kwargs):
+        super(StubTask, self).__init__(*args, **kwargs)
+        self.status = models.Task.PENDING
+        self.due_at = datetime.utcnow()
+
+
+
+
+class StartWorkflowTask(StubTask):
+    """
+    Task marking a workflow start
+    """
+    pass
+
+
+class EndWorkflowTask(StubTask):
+    """
+    Task marking a workflow end
+    """
+    pass
+
+
+class StartSubWorkflowTask(StubTask):
+    """
+    Task marking a subworkflow start
+    """
+    pass
+
+
+class EndSubWorkflowTask(StubTask):
+    """
+    Task marking a subworkflow end
+    """
+    pass
+
+
+class OperationTask(BaseTask):
+    """
+    Operation task
+    """
+    def __init__(self, api_task, *args, **kwargs):
+        # If no executor is provided, we infer that this is an empty task which does not need to be
+        # executed.
+        super(OperationTask, self).__init__(id=api_task.id, *args, **kwargs)
+        self._workflow_context = api_task._workflow_context
+        self.interface_name = api_task.interface_name
+        self.operation_name = api_task.operation_name
+        model_storage = api_task._workflow_context.model
+
+        base_task_model = model_storage.task.model_cls
+        if isinstance(api_task.actor, models.Node):
+            context_cls = operation_context.NodeOperationContext
+            create_task_model = base_task_model.for_node
+        elif isinstance(api_task.actor, models.Relationship):
+            context_cls = operation_context.RelationshipOperationContext
+            create_task_model = base_task_model.for_relationship
+        else:
+            raise RuntimeError('No operation context could be created for {actor.model_cls}'
+                               .format(actor=api_task.actor))
+
+        task_model = create_task_model(
+            name=api_task.name,
+            actor=api_task.actor,
+            status=base_task_model.PENDING,
+            max_attempts=api_task.max_attempts,
+            retry_interval=api_task.retry_interval,
+            ignore_failure=api_task.ignore_failure,
+            execution=self._workflow_context.execution,
+
+            # Only non-stub tasks have these fields
+            plugin=api_task.plugin,
+            function=api_task.function,
+            arguments=api_task.arguments
+        )
+        self._workflow_context.model.task.put(task_model)
+
+        self._ctx = context_cls(name=api_task.name,
+                                model_storage=self._workflow_context.model,
+                                resource_storage=self._workflow_context.resource,
+                                service_id=self._workflow_context._service_id,
+                                task_id=task_model.id,
+                                actor_id=api_task.actor.id,
+                                execution_id=self._workflow_context._execution_id,
+                                workdir=self._workflow_context._workdir)
+        self._task_id = task_model.id
+        self._update_fields = None
+
+    @contextmanager
+    def _update(self):
+        """
+        A context manager which puts the task into update mode, enabling fields update.
+        :yields: None
+        """
+        self._update_fields = {}
+        try:
+            yield
+            for key, value in self._update_fields.items():
+                setattr(self.model_task, key, value)
+            self.model_task = self.model_task
+        finally:
+            self._update_fields = None
+
+    @property
+    def model_task(self):
+        """
+        Returns the task model in storage
+        :return: task in storage
+        """
+        return self._workflow_context.model.task.get(self._task_id)
+
+    @model_task.setter
+    def model_task(self, value):
+        self._workflow_context.model.task.put(value)
+
+    @property
+    def context(self):
+        """
+        Contexts for the operation
+        :return:
+        """
+        return self._ctx
+
+    @property
+    def status(self):
+        """
+        Returns the task status
+        :return: task status
+        """
+        return self.model_task.status
+
+    @status.setter
+    @_locked
+    def status(self, value):
+        self._update_fields['status'] = value
+
+    @property
+    def started_at(self):
+        """
+        Returns when the task started
+        :return: when task started
+        """
+        return self.model_task.started_at
+
+    @started_at.setter
+    @_locked
+    def started_at(self, value):
+        self._update_fields['started_at'] = value
+
+    @property
+    def ended_at(self):
+        """
+        Returns when the task ended
+        :return: when task ended
+        """
+        return self.model_task.ended_at
+
+    @ended_at.setter
+    @_locked
+    def ended_at(self, value):
+        self._update_fields['ended_at'] = value
+
+    @property
+    def attempts_count(self):
+        """
+        Returns the attempts count for the task
+        :return: attempts count
+        """
+        return self.model_task.attempts_count
+
+    @attempts_count.setter
+    @_locked
+    def attempts_count(self, value):
+        self._update_fields['attempts_count'] = value
+
+    @property
+    def due_at(self):
+        """
+        Returns the minimum datetime in which the task can be executed
+        :return: eta
+        """
+        return self.model_task.due_at
+
+    @due_at.setter
+    @_locked
+    def due_at(self, value):
+        self._update_fields['due_at'] = value
+
+    def __getattr__(self, attr):
+        try:
+            return getattr(self.model_task, attr)
+        except AttributeError:
+            return super(OperationTask, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 3a96804..120d83a 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -27,7 +27,6 @@ from aria.modeling import models
 from aria.orchestrator import events
 
 from .. import exceptions
-from . import task as engine_task
 from . import translation
 # Import required so all signals are registered
 from . import events_handler  # pylint: disable=unused-import
@@ -38,13 +37,18 @@ class Engine(logger.LoggerMixin):
     The workflow engine. Executes workflows
     """
 
-    def __init__(self, executor, workflow_context, tasks_graph, **kwargs):
+    def __init__(self, executor, workflow_context, tasks_graph, executor_kwargs=None, **kwargs):
         super(Engine, self).__init__(**kwargs)
         self._workflow_context = workflow_context
         self._execution_graph = networkx.DiGraph()
         translation.build_execution_graph(task_graph=tasks_graph,
                                           execution_graph=self._execution_graph,
-                                          default_executor=executor)
+                                          executor_kwargs=executor_kwargs,
+                                          default_executor=executor,
+                                          execution=workflow_context.execution)
+
+        # Update the storage
+        workflow_context.execution = workflow_context.execution
 
     def execute(self):
         """
@@ -103,14 +107,14 @@ class Engine(logger.LoggerMixin):
     def _tasks_iter(self):
         for _, data in self._execution_graph.nodes_iter(data=True):
             task = data['task']
-            if isinstance(task, engine_task.OperationTask):
-                if not task.model_task.has_ended():
-                    self._workflow_context.model.task.refresh(task.model_task)
+            if isinstance(task, models.Task):
+                if not task.has_ended():
+                    self._workflow_context.model.task.refresh(task)
             yield task
 
     @staticmethod
     def _handle_executable_task(task):
-        if isinstance(task, engine_task.OperationTask):
+        if not task.stub_type:
             events.sent_task_signal.send(task)
         task.execute()
 
@@ -118,4 +122,4 @@ class Engine(logger.LoggerMixin):
         if task.status == models.Task.FAILED and not task.ignore_failure:
             raise exceptions.ExecutorException('Workflow failed')
         else:
-            self._execution_graph.remove_node(task.id)
+            self._execution_graph.remove_node(task.api_id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 669fb43..c733e79 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -31,15 +31,13 @@ from ... import exceptions
 
 @events.sent_task_signal.connect
 def _task_sent(task, *args, **kwargs):
-    with task._update():
-        task.status = task.SENT
+    task.status = task.SENT
 
 
 @events.start_task_signal.connect
 def _task_started(task, *args, **kwargs):
-    with task._update():
-        task.started_at = datetime.utcnow()
-        task.status = task.STARTED
+    task.started_at = datetime.utcnow()
+    task.status = task.STARTED
     _update_node_state_if_necessary(task, is_transitional=True)
 
 
@@ -136,8 +134,7 @@ def _workflow_cancelling(workflow_context, *args, **kwargs):
 def _update_node_state_if_necessary(task, is_transitional=False):
     # TODO: this is not the right way to check! the interface name is arbitrary
     # and also will *never* be the type name
-    model_task = task.model_task
-    node = model_task.node if model_task is not None else None
+    node = task.node if task is not None else None
     if (node is not None) and \
         (task.interface_name in ('Standard', 'tosca.interfaces.node.lifecycle.Standard')):
         state = node.determine_state(op_name=task.operation_name, is_transitional=is_transitional)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
deleted file mode 100644
index 72d83ea..0000000
--- a/aria/orchestrator/workflows/core/task.py
+++ /dev/null
@@ -1,269 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow tasks
-"""
-
-from contextlib import contextmanager
-from datetime import datetime
-from functools import (
-    partial,
-    wraps,
-)
-
-
-from ....modeling import models
-from ...context import operation as operation_context
-from .. import exceptions
-
-
-def _locked(func=None):
-    if func is None:
-        return partial(_locked, func=_locked)
-
-    @wraps(func)
-    def _wrapper(self, value, **kwargs):
-        if self._update_fields is None:
-            raise exceptions.TaskException('Task is not in update mode')
-        return func(self, value, **kwargs)
-    return _wrapper
-
-
-class BaseTask(object):
-    """
-    Base class for Task objects
-    """
-
-    def __init__(self, id, executor, *args, **kwargs):
-        super(BaseTask, self).__init__(*args, **kwargs)
-        self._id = id
-        self._executor = executor
-
-    def execute(self):
-        return self._executor.execute(self)
-
-    @property
-    def id(self):
-        """
-        :return: the task's id
-        """
-        return self._id
-
-
-class StubTask(BaseTask):
-    """
-    Base stub task for marker user tasks that only mark the start/end of a workflow
-    or sub-workflow
-    """
-    STARTED = models.Task.STARTED
-    SUCCESS = models.Task.SUCCESS
-
-    def __init__(self, *args, **kwargs):
-        super(StubTask, self).__init__(*args, **kwargs)
-        self.status = models.Task.PENDING
-        self.due_at = datetime.utcnow()
-
-    def has_ended(self):
-        return self.status == self.SUCCESS
-
-    def is_waiting(self):
-        return not self.has_ended()
-
-
-class StartWorkflowTask(StubTask):
-    """
-    Task marking a workflow start
-    """
-    pass
-
-
-class EndWorkflowTask(StubTask):
-    """
-    Task marking a workflow end
-    """
-    pass
-
-
-class StartSubWorkflowTask(StubTask):
-    """
-    Task marking a subworkflow start
-    """
-    pass
-
-
-class EndSubWorkflowTask(StubTask):
-    """
-    Task marking a subworkflow end
-    """
-    pass
-
-
-class OperationTask(BaseTask):
-    """
-    Operation task
-    """
-    def __init__(self, api_task, *args, **kwargs):
-        # If no executor is provided, we infer that this is an empty task which does not need to be
-        # executed.
-        super(OperationTask, self).__init__(id=api_task.id, *args, **kwargs)
-        self._workflow_context = api_task._workflow_context
-        self.interface_name = api_task.interface_name
-        self.operation_name = api_task.operation_name
-        model_storage = api_task._workflow_context.model
-
-        base_task_model = model_storage.task.model_cls
-        if isinstance(api_task.actor, models.Node):
-            context_cls = operation_context.NodeOperationContext
-            create_task_model = base_task_model.for_node
-        elif isinstance(api_task.actor, models.Relationship):
-            context_cls = operation_context.RelationshipOperationContext
-            create_task_model = base_task_model.for_relationship
-        else:
-            raise RuntimeError('No operation context could be created for {actor.model_cls}'
-                               .format(actor=api_task.actor))
-
-        task_model = create_task_model(
-            name=api_task.name,
-            actor=api_task.actor,
-            status=base_task_model.PENDING,
-            max_attempts=api_task.max_attempts,
-            retry_interval=api_task.retry_interval,
-            ignore_failure=api_task.ignore_failure,
-            execution=self._workflow_context.execution,
-
-            # Only non-stub tasks have these fields
-            plugin=api_task.plugin,
-            function=api_task.function,
-            arguments=api_task.arguments
-        )
-        self._workflow_context.model.task.put(task_model)
-
-        self._ctx = context_cls(name=api_task.name,
-                                model_storage=self._workflow_context.model,
-                                resource_storage=self._workflow_context.resource,
-                                service_id=self._workflow_context._service_id,
-                                task_id=task_model.id,
-                                actor_id=api_task.actor.id,
-                                execution_id=self._workflow_context._execution_id,
-                                workdir=self._workflow_context._workdir)
-        self._task_id = task_model.id
-        self._update_fields = None
-
-    @contextmanager
-    def _update(self):
-        """
-        A context manager which puts the task into update mode, enabling fields update.
-        :yields: None
-        """
-        self._update_fields = {}
-        try:
-            yield
-            for key, value in self._update_fields.items():
-                setattr(self.model_task, key, value)
-            self.model_task = self.model_task
-        finally:
-            self._update_fields = None
-
-    @property
-    def model_task(self):
-        """
-        Returns the task model in storage
-        :return: task in storage
-        """
-        return self._workflow_context.model.task.get(self._task_id)
-
-    @model_task.setter
-    def model_task(self, value):
-        self._workflow_context.model.task.put(value)
-
-    @property
-    def context(self):
-        """
-        Contexts for the operation
-        :return:
-        """
-        return self._ctx
-
-    @property
-    def status(self):
-        """
-        Returns the task status
-        :return: task status
-        """
-        return self.model_task.status
-
-    @status.setter
-    @_locked
-    def status(self, value):
-        self._update_fields['status'] = value
-
-    @property
-    def started_at(self):
-        """
-        Returns when the task started
-        :return: when task started
-        """
-        return self.model_task.started_at
-
-    @started_at.setter
-    @_locked
-    def started_at(self, value):
-        self._update_fields['started_at'] = value
-
-    @property
-    def ended_at(self):
-        """
-        Returns when the task ended
-        :return: when task ended
-        """
-        return self.model_task.ended_at
-
-    @ended_at.setter
-    @_locked
-    def ended_at(self, value):
-        self._update_fields['ended_at'] = value
-
-    @property
-    def attempts_count(self):
-        """
-        Returns the attempts count for the task
-        :return: attempts count
-        """
-        return self.model_task.attempts_count
-
-    @attempts_count.setter
-    @_locked
-    def attempts_count(self, value):
-        self._update_fields['attempts_count'] = value
-
-    @property
-    def due_at(self):
-        """
-        Returns the minimum datetime in which the task can be executed
-        :return: eta
-        """
-        return self.model_task.due_at
-
-    @due_at.setter
-    @_locked
-    def due_at(self, value):
-        self._update_fields['due_at'] = value
-
-    def __getattr__(self, attr):
-        try:
-            return getattr(self.model_task, attr)
-        except AttributeError:
-            return super(OperationTask, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index fec108b..50fd65b 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -17,29 +17,34 @@
 Translation of user graph's API to the execution graph
 """
 
+from ....modeling import models
 from .. import api
 from ..executor import base
-from . import task as core_task
 
 
 def build_execution_graph(
         task_graph,
         execution_graph,
         default_executor,
-        start_cls=core_task.StartWorkflowTask,
-        end_cls=core_task.EndWorkflowTask,
+        execution,
+        executor_kwargs=None,
+        start_stub_type=models.Task.START_WORKFLOW,
+        end_stub_type=models.Task.END_WORKFLOW,
         depends_on=()):
     """
     Translates the user graph to the execution graph
     :param task_graph: The user's graph
     :param workflow_context: The workflow
     :param execution_graph: The execution graph that is being built
-    :param start_cls: internal use
-    :param end_cls: internal use
+    :param start_stub_type: internal use
+    :param end_stub_type: internal use
     :param depends_on: internal use
     """
     # Insert start marker
-    start_task = start_cls(id=_start_graph_suffix(task_graph.id), executor=base.StubTaskExecutor())
+    start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
+                             _executor=base.StubTaskExecutor,
+                             execution=execution,
+                             stub_type=start_stub_type)
     _add_task_and_dependencies(execution_graph, start_task, depends_on)
 
     for api_task in task_graph.topological_order(reverse=True):
@@ -48,7 +53,9 @@ def build_execution_graph(
             execution_graph, dependencies, default=[start_task])
 
         if isinstance(api_task, api.task.OperationTask):
-            operation_task = core_task.OperationTask(api_task, executor=default_executor)
+            operation_task = models.Task.from_api_task(api_task=api_task,
+                                                       executor=default_executor,
+                                                       executor_kwargs=executor_kwargs)
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
@@ -56,12 +63,17 @@ def build_execution_graph(
                 task_graph=api_task,
                 execution_graph=execution_graph,
                 default_executor=default_executor,
-                start_cls=core_task.StartSubWorkflowTask,
-                end_cls=core_task.EndSubWorkflowTask,
+                execution=execution,
+                executor_kwargs=executor_kwargs,
+                start_stub_type=models.Task.START_SUBWROFKLOW,
+                end_stub_type=models.Task.END_SUBWORKFLOW,
                 depends_on=operation_dependencies
             )
         elif isinstance(api_task, api.task.StubTask):
-            stub_task = core_task.StubTask(id=api_task.id, executor=base.StubTaskExecutor())
+            stub_task = models.Task(api_id=api_task.id,
+                                    _executor=base.StubTaskExecutor,
+                                    execution=execution,
+                                    stub_type=models.StubTask.STUB)
             _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
         else:
             raise RuntimeError('Undefined state')
@@ -71,14 +83,17 @@ def build_execution_graph(
         execution_graph,
         _get_non_dependency_tasks(task_graph),
         default=[start_task])
-    end_task = end_cls(id=_end_graph_suffix(task_graph.id), executor=base.StubTaskExecutor())
+    end_task = models.Task(api_id=_end_graph_suffix(task_graph.id),
+                           _executor=base.StubTaskExecutor,
+                           execution=execution,
+                           stub_type=end_stub_type)
     _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies)
 
 
 def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()):
-    execution_graph.add_node(operation_task.id, task=operation_task)
+    execution_graph.add_node(operation_task.api_id, task=operation_task)
     for dependency in operation_dependencies:
-        execution_graph.add_edge(dependency.id, operation_task.id)
+        execution_graph.add_edge(dependency.api_id, operation_task.api_id)
 
 
 def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
@@ -88,19 +103,19 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
     tasks = []
     for dependency in dependencies:
         if isinstance(dependency, (api.task.OperationTask, api.task.StubTask)):
-            dependency_id = dependency.id
+            dependency_name = dependency.id
         else:
-            dependency_id = _end_graph_suffix(dependency.id)
-        tasks.append(execution_graph.node[dependency_id]['task'])
+            dependency_name = _end_graph_suffix(dependency.id)
+        tasks.append(execution_graph.node[dependency_name]['task'])
     return tasks or default
 
 
-def _start_graph_suffix(id):
-    return '{0}-Start'.format(id)
+def _start_graph_suffix(api_id):
+    return '{0}-Start'.format(api_id)
 
 
-def _end_graph_suffix(id):
-    return '{0}-End'.format(id)
+def _end_graph_suffix(api_id):
+    return '{0}-End'.format(api_id)
 
 
 def _get_non_dependency_tasks(graph):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 56a56a5..a44499e 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -58,6 +58,7 @@ class ThreadExecutor(BaseExecutor):
         while not self._stopped:
             try:
                 task = self._queue.get(timeout=1)
+                task = task.context.task
                 self._task_started(task)
                 try:
                     task_func = imports.load_attribute(task.function)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/tests/modeling/test_mixins.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_mixins.py b/tests/modeling/test_mixins.py
index 2c91a4b..4101fff 100644
--- a/tests/modeling/test_mixins.py
+++ b/tests/modeling/test_mixins.py
@@ -216,3 +216,11 @@ def test_strict_list():
     assert_strict(strict_class)
     with pytest.raises(ValueFormatException):
         strict_class.strict_list[0] = 1
+
+
+def test_task():
+    from aria.orchestrator.workflows.executor import process
+    task = modeling.models.StubTask()
+    task.executor = process.ProcessExecutor
+    e = task.executor
+    pass
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/tests/orchestrator/workflows/core/_test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/_test_engine.py b/tests/orchestrator/workflows/core/_test_engine.py
new file mode 100644
index 0000000..7ffb92a
--- /dev/null
+++ b/tests/orchestrator/workflows/core/_test_engine.py
@@ -0,0 +1,519 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import time
+import threading
+from datetime import datetime
+
+import pytest
+
+from aria.orchestrator import (
+    events,
+    workflow,
+    operation,
+)
+from aria.modeling import models
+from aria.orchestrator.workflows import (
+    api,
+    exceptions,
+)
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor import thread
+
+from tests import mock, storage
+
+
+global_test_holder = {}
+
+
+class BaseTest(object):
+
+    @classmethod
+    def _execute(cls, workflow_func, workflow_context, executor, executor_kwargs=None):
+        eng = cls._engine(workflow_func=workflow_func,
+                          workflow_context=workflow_context,
+                          executor=executor,
+                          executor_kwargs=executor_kwargs)
+        eng.execute()
+        return eng
+
+    @staticmethod
+    def _engine(workflow_func, workflow_context, executor, executor_kwargs=None):
+        graph = workflow_func(ctx=workflow_context)
+        return engine.Engine(executor=executor,
+                             executor_kwargs=executor_kwargs,
+                             workflow_context=workflow_context,
+                             tasks_graph=graph)
+
+    @staticmethod
+    def _op(ctx,
+            func,
+            arguments=None,
+            max_attempts=None,
+            retry_interval=None,
+            ignore_failure=None):
+        node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+        interface_name = 'aria.interfaces.lifecycle'
+        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+            name=__name__, func=func))
+        if arguments:
+            # the operation has to declare the arguments before those may be passed
+            operation_kwargs['arguments'] = arguments
+        operation_name = 'create'
+        interface = mock.models.create_interface(node.service, interface_name, operation_name,
+                                                 operation_kwargs=operation_kwargs)
+        node.interfaces[interface.name] = interface
+
+        return api.task.OperationTask(
+            node,
+            interface_name='aria.interfaces.lifecycle',
+            operation_name=operation_name,
+            arguments=arguments,
+            max_attempts=max_attempts,
+            retry_interval=retry_interval,
+            ignore_failure=ignore_failure,
+        )
+
+    @pytest.fixture(autouse=True)
+    def globals_cleanup(self):
+        try:
+            yield
+        finally:
+            global_test_holder.clear()
+
+    @pytest.fixture(autouse=True)
+    def signals_registration(self, ):
+        def sent_task_handler(task, *args, **kwargs):
+            if task.stub_type is None:
+                calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
+                global_test_holder['sent_task_signal_calls'] = calls + 1
+
+        def start_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('start')
+
+        def success_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('success')
+
+        def failure_workflow_handler(workflow_context, exception, *args, **kwargs):
+            workflow_context.states.append('failure')
+            workflow_context.exception = exception
+
+        def cancel_workflow_handler(workflow_context, *args, **kwargs):
+            workflow_context.states.append('cancel')
+
+        events.start_workflow_signal.connect(start_workflow_handler)
+        events.on_success_workflow_signal.connect(success_workflow_handler)
+        events.on_failure_workflow_signal.connect(failure_workflow_handler)
+        events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
+        events.sent_task_signal.connect(sent_task_handler)
+        try:
+            yield
+        finally:
+            events.start_workflow_signal.disconnect(start_workflow_handler)
+            events.on_success_workflow_signal.disconnect(success_workflow_handler)
+            events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+            events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
+            events.sent_task_signal.disconnect(sent_task_handler)
+
+    @pytest.fixture
+    def executor(self):
+        return thread.ThreadExecutor
+
+    @pytest.fixture
+    def workflow_context(self, tmpdir):
+        workflow_context = mock.context.simple(str(tmpdir))
+        workflow_context.states = []
+        workflow_context.exception = None
+        yield workflow_context
+        storage.release_sqlite_storage(workflow_context.model)
+
+
+class TestEngine(BaseTest):
+
+    def test_empty_graph_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(**_):
+            pass
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert 'sent_task_signal_calls' not in global_test_holder
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is None
+        assert execution.status == models.Execution.SUCCEEDED
+
+    def test_single_task_successful_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(ctx, func=mock_success_task))
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+    def test_single_task_failed_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(ctx, func=mock_failed_task))
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is not None
+        assert execution.status == models.Execution.FAILED
+
+    def test_two_tasks_execution_order(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
+            op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
+            graph.sequence(op1, op2)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1, 2]
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+        @workflow
+        def sub_workflow(ctx, graph):
+            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
+            op2 = api.task.StubTask()
+            op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
+            graph.sequence(op1, op2, op3)
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1, 2]
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+
+class TestCancel(BaseTest):
+
+    def test_cancel_started_execution(self, workflow_context, executor):
+        number_of_tasks = 100
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            operations = (
+                self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1))
+                for _ in range(number_of_tasks)
+            )
+            return graph.sequence(*operations)
+
+        eng = self._engine(workflow_func=mock_workflow,
+                           workflow_context=workflow_context,
+                           executor=executor)
+        t = threading.Thread(target=eng.execute)
+        t.start()
+        time.sleep(10)
+        eng.cancel_execution()
+        t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow
+        assert not t.is_alive() # if join is timed out it will not raise an exception
+        assert workflow_context.states == ['start', 'cancel']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert 0 < len(invocations) < number_of_tasks
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is None
+        assert execution.status == models.Execution.CANCELLED
+
+    def test_cancel_pending_execution(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(graph, **_):
+            return graph
+        eng = self._engine(workflow_func=mock_workflow,
+                           workflow_context=workflow_context,
+                           executor=executor)
+        eng.cancel_execution()
+        execution = workflow_context.execution
+        assert execution.status == models.Execution.CANCELLED
+
+
+class TestRetries(BaseTest):
+
+    def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 1},
+                          max_attempts=2)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 2},
+                          max_attempts=2)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 1},
+                          max_attempts=3)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 2},
+                          max_attempts=3)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 3
+        assert global_test_holder.get('sent_task_signal_calls') == 3
+
+    def test_infinite_retries(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 1},
+                          max_attempts=-1)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert len(global_test_holder.get('invocations', [])) == 2
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_retry_interval_float(self, workflow_context, executor):
+        self._test_retry_interval(retry_interval=0.3,
+                                  workflow_context=workflow_context,
+                                  executor=executor)
+
+    def test_retry_interval_int(self, workflow_context, executor):
+        self._test_retry_interval(retry_interval=1,
+                                  workflow_context=workflow_context,
+                                  executor=executor)
+
+    def _test_retry_interval(self, retry_interval, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          arguments={'failure_count': 1},
+                          max_attempts=2,
+                          retry_interval=retry_interval)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        invocation1, invocation2 = invocations
+        assert invocation2 - invocation1 >= retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_ignore_failure(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_conditional_failure_task,
+                          ignore_failure=True,
+                          arguments={'failure_count': 100},
+                          max_attempts=100)
+            graph.add_tasks(op)
+        self._execute(
+            workflow_func=mock_workflow,
+            workflow_context=workflow_context,
+            executor=executor)
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 1
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
+class TestTaskRetryAndAbort(BaseTest):
+    message = 'EXPECTED_ERROR'
+
+    def test_task_retry_default_interval(self, workflow_context, executor):
+        default_retry_interval = 0.1
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_task_retry,
+                          arguments={'message': self.message},
+                          retry_interval=default_retry_interval,
+                          max_attempts=2)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        invocation1, invocation2 = invocations
+        assert invocation2 - invocation1 >= default_retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_task_retry_custom_interval(self, workflow_context, executor):
+        default_retry_interval = 100
+        custom_retry_interval = 0.1
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_task_retry,
+                          arguments={'message': self.message,
+                                     'retry_interval': custom_retry_interval},
+                          retry_interval=default_retry_interval,
+                          max_attempts=2)
+            graph.add_tasks(op)
+        execution_start = time.time()
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        execution_end = time.time()
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 2
+        assert (execution_end - execution_start) < default_retry_interval
+        assert global_test_holder.get('sent_task_signal_calls') == 2
+
+    def test_task_abort(self, workflow_context, executor):
+        @workflow
+        def mock_workflow(ctx, graph):
+            op = self._op(ctx, func=mock_task_abort,
+                          arguments={'message': self.message},
+                          retry_interval=100,
+                          max_attempts=100)
+            graph.add_tasks(op)
+        with pytest.raises(exceptions.ExecutorException):
+            self._execute(
+                workflow_func=mock_workflow,
+                workflow_context=workflow_context,
+                executor=executor)
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
+        invocations = global_test_holder.get('invocations', [])
+        assert len(invocations) == 1
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
+
+@operation
+def mock_success_task(**_):
+    pass
+
+
+@operation
+def mock_failed_task(**_):
+    raise RuntimeError
+
+
+@operation
+def mock_ordered_task(counter, **_):
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(counter)
+
+
+@operation
+def mock_conditional_failure_task(failure_count, **_):
+    invocations = global_test_holder.setdefault('invocations', [])
+    try:
+        if len(invocations) < failure_count:
+            raise RuntimeError
+    finally:
+        invocations.append(time.time())
+
+
+@operation
+def mock_sleep_task(seconds, **_):
+    _add_invocation_timestamp()
+    time.sleep(seconds)
+
+
+@operation
+def mock_task_retry(ctx, message, retry_interval=None, **_):
+    _add_invocation_timestamp()
+    retry_kwargs = {}
+    if retry_interval is not None:
+        retry_kwargs['retry_interval'] = retry_interval
+    ctx.task.retry(message, **retry_kwargs)
+
+
+@operation
+def mock_task_abort(ctx, message, **_):
+    _add_invocation_timestamp()
+    ctx.task.abort(message)
+
+
+def _add_invocation_timestamp():
+    invocations = global_test_holder.setdefault('invocations', [])
+    invocations.append(time.time())

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
deleted file mode 100644
index 6d2836c..0000000
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ /dev/null
@@ -1,520 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-import time
-import threading
-from datetime import datetime
-
-import pytest
-
-from aria.orchestrator import (
-    events,
-    workflow,
-    operation,
-)
-from aria.modeling import models
-from aria.orchestrator.workflows import (
-    api,
-    exceptions,
-)
-from aria.orchestrator.workflows.core import engine
-from aria.orchestrator.workflows.executor import thread
-
-from tests import mock, storage
-
-
-global_test_holder = {}
-
-
-class BaseTest(object):
-
-    @classmethod
-    def _execute(cls, workflow_func, workflow_context, executor):
-        eng = cls._engine(workflow_func=workflow_func,
-                          workflow_context=workflow_context,
-                          executor=executor)
-        eng.execute()
-        return eng
-
-    @staticmethod
-    def _engine(workflow_func, workflow_context, executor):
-        graph = workflow_func(ctx=workflow_context)
-        return engine.Engine(executor=executor,
-                             workflow_context=workflow_context,
-                             tasks_graph=graph)
-
-    @staticmethod
-    def _op(ctx,
-            func,
-            arguments=None,
-            max_attempts=None,
-            retry_interval=None,
-            ignore_failure=None):
-        node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
-        interface_name = 'aria.interfaces.lifecycle'
-        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
-            name=__name__, func=func))
-        if arguments:
-            # the operation has to declare the arguments before those may be passed
-            operation_kwargs['arguments'] = arguments
-        operation_name = 'create'
-        interface = mock.models.create_interface(node.service, interface_name, operation_name,
-                                                 operation_kwargs=operation_kwargs)
-        node.interfaces[interface.name] = interface
-
-        return api.task.OperationTask(
-            node,
-            interface_name='aria.interfaces.lifecycle',
-            operation_name=operation_name,
-            arguments=arguments,
-            max_attempts=max_attempts,
-            retry_interval=retry_interval,
-            ignore_failure=ignore_failure,
-        )
-
-    @pytest.fixture(autouse=True)
-    def globals_cleanup(self):
-        try:
-            yield
-        finally:
-            global_test_holder.clear()
-
-    @pytest.fixture(autouse=True)
-    def signals_registration(self, ):
-        def sent_task_handler(*args, **kwargs):
-            calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
-            global_test_holder['sent_task_signal_calls'] = calls + 1
-
-        def start_workflow_handler(workflow_context, *args, **kwargs):
-            workflow_context.states.append('start')
-
-        def success_workflow_handler(workflow_context, *args, **kwargs):
-            workflow_context.states.append('success')
-
-        def failure_workflow_handler(workflow_context, exception, *args, **kwargs):
-            workflow_context.states.append('failure')
-            workflow_context.exception = exception
-
-        def cancel_workflow_handler(workflow_context, *args, **kwargs):
-            workflow_context.states.append('cancel')
-
-        events.start_workflow_signal.connect(start_workflow_handler)
-        events.on_success_workflow_signal.connect(success_workflow_handler)
-        events.on_failure_workflow_signal.connect(failure_workflow_handler)
-        events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
-        events.sent_task_signal.connect(sent_task_handler)
-        try:
-            yield
-        finally:
-            events.start_workflow_signal.disconnect(start_workflow_handler)
-            events.on_success_workflow_signal.disconnect(success_workflow_handler)
-            events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
-            events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
-            events.sent_task_signal.disconnect(sent_task_handler)
-
-    @pytest.fixture
-    def executor(self):
-        result = thread.ThreadExecutor()
-        try:
-            yield result
-        finally:
-            result.close()
-
-    @pytest.fixture
-    def workflow_context(self, tmpdir):
-        workflow_context = mock.context.simple(str(tmpdir))
-        workflow_context.states = []
-        workflow_context.exception = None
-        yield workflow_context
-        storage.release_sqlite_storage(workflow_context.model)
-
-
-class TestEngine(BaseTest):
-
-    def test_empty_graph_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(**_):
-            pass
-        self._execute(workflow_func=mock_workflow,
-                      workflow_context=workflow_context,
-                      executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert 'sent_task_signal_calls' not in global_test_holder
-        execution = workflow_context.execution
-        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
-        assert execution.error is None
-        assert execution.status == models.Execution.SUCCEEDED
-
-    def test_single_task_successful_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(ctx, func=mock_success_task))
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-
-    def test_single_task_failed_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            graph.add_tasks(self._op(ctx, func=mock_failed_task))
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-        execution = workflow_context.execution
-        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
-        assert execution.error is not None
-        assert execution.status == models.Execution.FAILED
-
-    def test_two_tasks_execution_order(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
-            op2 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
-            graph.sequence(op1, op2)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert global_test_holder.get('invocations') == [1, 2]
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_stub_and_subworkflow_execution(self, workflow_context, executor):
-        @workflow
-        def sub_workflow(ctx, graph):
-            op1 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 1})
-            op2 = api.task.StubTask()
-            op3 = self._op(ctx, func=mock_ordered_task, arguments={'counter': 2})
-            graph.sequence(op1, op2, op3)
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            graph.add_tasks(api.task.WorkflowTask(sub_workflow, ctx=ctx))
-        self._execute(workflow_func=mock_workflow,
-                      workflow_context=workflow_context,
-                      executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert global_test_holder.get('invocations') == [1, 2]
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-
-class TestCancel(BaseTest):
-
-    def test_cancel_started_execution(self, workflow_context, executor):
-        number_of_tasks = 100
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            operations = (
-                self._op(ctx, func=mock_sleep_task, arguments=dict(seconds=0.1))
-                for _ in range(number_of_tasks)
-            )
-            return graph.sequence(*operations)
-
-        eng = self._engine(workflow_func=mock_workflow,
-                           workflow_context=workflow_context,
-                           executor=executor)
-        t = threading.Thread(target=eng.execute)
-        t.start()
-        time.sleep(10)
-        eng.cancel_execution()
-        t.join(timeout=60) # we need to give this a *lot* of time because Travis can be *very* slow
-        assert not t.is_alive() # if join is timed out it will not raise an exception
-        assert workflow_context.states == ['start', 'cancel']
-        assert workflow_context.exception is None
-        invocations = global_test_holder.get('invocations', [])
-        assert 0 < len(invocations) < number_of_tasks
-        execution = workflow_context.execution
-        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
-        assert execution.error is None
-        assert execution.status == models.Execution.CANCELLED
-
-    def test_cancel_pending_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(graph, **_):
-            return graph
-        eng = self._engine(workflow_func=mock_workflow,
-                           workflow_context=workflow_context,
-                           executor=executor)
-        eng.cancel_execution()
-        execution = workflow_context.execution
-        assert execution.status == models.Execution.CANCELLED
-
-
-class TestRetries(BaseTest):
-
-    def test_two_max_attempts_and_success_on_retry(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 1},
-                          max_attempts=2)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert len(global_test_holder.get('invocations', [])) == 2
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_two_max_attempts_and_failure_on_retry(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 2},
-                          max_attempts=2)
-            graph.add_tasks(op)
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
-        assert len(global_test_holder.get('invocations', [])) == 2
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_three_max_attempts_and_success_on_first_retry(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 1},
-                          max_attempts=3)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert len(global_test_holder.get('invocations', [])) == 2
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_three_max_attempts_and_success_on_second_retry(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 2},
-                          max_attempts=3)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert len(global_test_holder.get('invocations', [])) == 3
-        assert global_test_holder.get('sent_task_signal_calls') == 3
-
-    def test_infinite_retries(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 1},
-                          max_attempts=-1)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        assert len(global_test_holder.get('invocations', [])) == 2
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_retry_interval_float(self, workflow_context, executor):
-        self._test_retry_interval(retry_interval=0.3,
-                                  workflow_context=workflow_context,
-                                  executor=executor)
-
-    def test_retry_interval_int(self, workflow_context, executor):
-        self._test_retry_interval(retry_interval=1,
-                                  workflow_context=workflow_context,
-                                  executor=executor)
-
-    def _test_retry_interval(self, retry_interval, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          arguments={'failure_count': 1},
-                          max_attempts=2,
-                          retry_interval=retry_interval)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 2
-        invocation1, invocation2 = invocations
-        assert invocation2 - invocation1 >= retry_interval
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_ignore_failure(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_conditional_failure_task,
-                          ignore_failure=True,
-                          arguments={'failure_count': 100},
-                          max_attempts=100)
-            graph.add_tasks(op)
-        self._execute(
-            workflow_func=mock_workflow,
-            workflow_context=workflow_context,
-            executor=executor)
-        assert workflow_context.states == ['start', 'success']
-        assert workflow_context.exception is None
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 1
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-
-
-class TestTaskRetryAndAbort(BaseTest):
-    message = 'EXPECTED_ERROR'
-
-    def test_task_retry_default_interval(self, workflow_context, executor):
-        default_retry_interval = 0.1
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_task_retry,
-                          arguments={'message': self.message},
-                          retry_interval=default_retry_interval,
-                          max_attempts=2)
-            graph.add_tasks(op)
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 2
-        invocation1, invocation2 = invocations
-        assert invocation2 - invocation1 >= default_retry_interval
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_task_retry_custom_interval(self, workflow_context, executor):
-        default_retry_interval = 100
-        custom_retry_interval = 0.1
-
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_task_retry,
-                          arguments={'message': self.message,
-                                     'retry_interval': custom_retry_interval},
-                          retry_interval=default_retry_interval,
-                          max_attempts=2)
-            graph.add_tasks(op)
-        execution_start = time.time()
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        execution_end = time.time()
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 2
-        assert (execution_end - execution_start) < default_retry_interval
-        assert global_test_holder.get('sent_task_signal_calls') == 2
-
-    def test_task_abort(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(ctx, graph):
-            op = self._op(ctx, func=mock_task_abort,
-                          arguments={'message': self.message},
-                          retry_interval=100,
-                          max_attempts=100)
-            graph.add_tasks(op)
-        with pytest.raises(exceptions.ExecutorException):
-            self._execute(
-                workflow_func=mock_workflow,
-                workflow_context=workflow_context,
-                executor=executor)
-        assert workflow_context.states == ['start', 'failure']
-        assert isinstance(workflow_context.exception, exceptions.ExecutorException)
-        invocations = global_test_holder.get('invocations', [])
-        assert len(invocations) == 1
-        assert global_test_holder.get('sent_task_signal_calls') == 1
-
-
-@operation
-def mock_success_task(**_):
-    pass
-
-
-@operation
-def mock_failed_task(**_):
-    raise RuntimeError
-
-
-@operation
-def mock_ordered_task(counter, **_):
-    invocations = global_test_holder.setdefault('invocations', [])
-    invocations.append(counter)
-
-
-@operation
-def mock_conditional_failure_task(failure_count, **_):
-    invocations = global_test_holder.setdefault('invocations', [])
-    try:
-        if len(invocations) < failure_count:
-            raise RuntimeError
-    finally:
-        invocations.append(time.time())
-
-
-@operation
-def mock_sleep_task(seconds, **_):
-    _add_invocation_timestamp()
-    time.sleep(seconds)
-
-
-@operation
-def mock_task_retry(ctx, message, retry_interval=None, **_):
-    _add_invocation_timestamp()
-    retry_kwargs = {}
-    if retry_interval is not None:
-        retry_kwargs['retry_interval'] = retry_interval
-    ctx.task.retry(message, **retry_kwargs)
-
-
-@operation
-def mock_task_abort(ctx, message, **_):
-    _add_invocation_timestamp()
-    ctx.task.abort(message)
-
-
-def _add_invocation_timestamp():
-    invocations = global_test_holder.setdefault('invocations', [])
-    invocations.append(time.time())

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/f36fe86c/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 5dd2855..398ca7e 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -15,6 +15,7 @@
 
 from networkx import topological_sort, DiGraph
 
+from aria.modeling import models
 from aria.orchestrator import context
 from aria.orchestrator.workflows import api, core
 from aria.orchestrator.workflows.executor import base
@@ -68,7 +69,8 @@ def test_task_graph_into_execution_graph(tmpdir):
     execution_graph = DiGraph()
     core.translation.build_execution_graph(task_graph=test_task_graph,
                                            execution_graph=execution_graph,
-                                           default_executor=base.StubTaskExecutor())
+                                           execution=task_context.model.execution.list()[0],
+                                           default_executor=base.StubTaskExecutor)
     execution_tasks = topological_sort(execution_graph)
 
     assert len(execution_tasks) == 7
@@ -84,29 +86,32 @@ def test_task_graph_into_execution_graph(tmpdir):
     ]
 
     assert expected_tasks_names == execution_tasks
+    assert all(isinstance(_get_task_by_name(task_name, execution_graph), models.Task)
+               for task_name in execution_tasks)
 
-    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
-                      core.task.StartWorkflowTask)
+    first_task = _get_task_by_name(execution_tasks[0], execution_graph)
+    assert first_task.stub_type == models.Task.START_WORKFLOW
 
-    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph),
-                                  simple_before_task)
-    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
-                      core.task.StartSubWorkflowTask)
+    second_task = _get_task_by_name(execution_tasks[1], execution_graph)
+    _assert_execution_is_api_task(second_task, simple_before_task)
 
-    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph),
-                                  inner_task)
-    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph),
-                      core.task.EndSubWorkflowTask)
+    third_task = _get_task_by_name(execution_tasks[2], execution_graph)
+    assert third_task.stub_type == models.Task.START_SUBWROFKLOW
 
-    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph),
-                                  simple_after_task)
-    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph),
-                      core.task.EndWorkflowTask)
+    fourth_task = _get_task_by_name(execution_tasks[3], execution_graph)
+    _assert_execution_is_api_task(fourth_task, inner_task)
+    fifth_task = _get_task_by_name(execution_tasks[4], execution_graph)
+    assert fifth_task.stub_type == models.Task.END_SUBWORKFLOW
+
+    sixth_task = _get_task_by_name(execution_tasks[5], execution_graph)
+    _assert_execution_is_api_task(sixth_task, simple_after_task)
+    seventh_task = _get_task_by_name(execution_tasks[6], execution_graph)
+    assert seventh_task.stub_type == models.Task.END_WORKFLOW
     storage.release_sqlite_storage(task_context.model)
 
 
 def _assert_execution_is_api_task(execution_task, api_task):
-    assert execution_task.id == api_task.id
+    assert execution_task.api_id == api_task.id
     assert execution_task.name == api_task.name
     assert execution_task.function == api_task.function
     assert execution_task.actor == api_task.actor



Mime
View raw message