ariatosca-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: ARIA-294 Workflow tasks execution is not in order [Forced Update!]
Date Wed, 28 Jun 2017 08:04:00 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-294-Workflow-tasks-execution-is-not-in-order 9525df17a -> 62e67c1ca (forced
update)


ARIA-294 Workflow tasks execution is not in order


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/62e67c1c
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/62e67c1c
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/62e67c1c

Branch: refs/heads/ARIA-294-Workflow-tasks-execution-is-not-in-order
Commit: 62e67c1ca39cfc658719230a49856ac01f0005a4
Parents: 7bba3ab
Author: max-orlov <maxim@gigaspaces.com>
Authored: Tue Jun 27 20:32:08 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Wed Jun 28 11:03:55 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py                  |   7 +-
 aria/modeling/relationship.py                   | 106 ++++++++-----------
 aria/modeling/service_common.py                 |   2 +-
 aria/modeling/service_instance.py               |   4 +-
 aria/modeling/service_template.py               |   2 +-
 aria/orchestrator/workflow_runner.py            |   4 +-
 .../workflows/core/graph_compiler.py            |   4 +-
 tests/orchestrator/test_workflow_runner.py      |  10 +-
 .../test_task_graph_into_execution_graph.py     |  89 +++++++++++++---
 .../workflows/executor/test_process_executor.py |   2 +-
 10 files changed, 130 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 5b02d1b..ab389d3 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -397,13 +397,8 @@ class TaskBase(mixins.ModelMixin):
         raise TaskRetryException(message, retry_interval=retry_interval)
 
     @declared_attr
-    def dependency_fk(self):
-        return relationship.foreign_key('task', nullable=True)
-
-    @declared_attr
     def dependencies(cls):
-        # symmetric relationship causes funky graphs
-        return relationship.one_to_many_self(cls, 'dependency_fk')
+        return relationship.many_to_many(cls, 'task', self=True)
 
     def has_ended(self):
         return self.status in (self.SUCCESS, self.FAILED)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/relationship.py
----------------------------------------------------------------------
diff --git a/aria/modeling/relationship.py b/aria/modeling/relationship.py
index 40be5b2..30d174f 100644
--- a/aria/modeling/relationship.py
+++ b/aria/modeling/relationship.py
@@ -90,35 +90,6 @@ def one_to_one_self(model_class, fk):
     )
 
 
-def one_to_many_self(model_class, fk, dict_key=None):
-    """
-    Declare a one-to-many relationship property. The property value would be a list or dict
of
-    instances of the same model.
-
-    You will need an associated foreign key to our own table.
-
-    *This utility method should only be used during class creation.*
-
-    :param model_class: The class in which this relationship will be declared
-    :type model_class: type
-    :param fk: Foreign key name
-    :type fk: basestring
-    :param dict_key: If set the value will be a dict with this key as the dict key; otherwise
will
-                     be a list
-    :type dict_key: basestring
-    """
-    return _relationship(
-        model_class,
-        model_class.__tablename__,
-        relationship_kwargs={
-            'remote_side': '{model_class}.{remote_column}'.format(
-                model_class=model_class.__name__, remote_column=fk)
-        },
-        back_populates=False,
-        dict_key=dict_key
-    )
-
-
 def one_to_one(model_class,
                other_table,
                fk=None,
@@ -162,11 +133,12 @@ def one_to_one(model_class,
 
 
 def one_to_many(model_class,
-                child_table,
-                child_fk=None,
+                other_table=None,
+                other_fk=None,
                 dict_key=None,
                 back_populates=None,
-                rel_kwargs=None):
+                rel_kwargs=None,
+                self=False):
     """
     Declare a one-to-many relationship property. The property value would be a list or dict
of
     instances of the child table's model.
@@ -181,9 +153,9 @@ def one_to_many(model_class,
     :param model_class: The class in which this relationship will be declared
     :type model_class: type
     :param child_table: Child table name
-    :type child_table: basestring
-    :param child_fk: Foreign key name at the child table (no need specify if there's no ambiguity)
-    :type child_fk: basestring
+    :type other_table: basestring
+    :param other_fk: Foreign key name at the child table (no need specify if there's no ambiguity)
+    :type other_fk: basestring
     :param dict_key: If set the value will be a dict with this key as the dict key; otherwise
will
                      be a list
     :type dict_key: basestring
@@ -191,18 +163,28 @@ def one_to_many(model_class,
                            false to disable
     :type back_populates: basestring|bool
     """
-    rel_kwargs = rel_kwargs or {}
-    rel_kwargs.setdefault('cascade', 'all')
-    if back_populates is None:
-        back_populates = model_class.__tablename__
+    relationship_kwargs = rel_kwargs or {}
+    if self:
+        assert other_fk
+        other_table_name = model_class.__tablename__
+        back_populates = False
+        relationship_kwargs['remote_side'] = '{model}.{column}'.format(model=model_class.__name__,
+                                                                       column=other_fk)
+
+    else:
+        assert other_table
+        other_table_name = other_table
+        if back_populates is None:
+            back_populates = model_class.__tablename__
+        relationship_kwargs.setdefault('cascade', 'all')
 
     return _relationship(
         model_class,
-        child_table,
+        other_table_name,
         back_populates=back_populates,
-        other_fk=child_fk,
+        other_fk=other_fk,
         dict_key=dict_key,
-        relationship_kwargs=rel_kwargs)
+        relationship_kwargs=relationship_kwargs)
 
 
 def many_to_one(model_class,
@@ -247,7 +229,8 @@ def many_to_many(model_class,
                  other_table,
                  prefix=None,
                  dict_key=None,
-                 other_property=None):
+                 other_property=None,
+                 self=False):
     """
     Declare a many-to-many relationship property. The property value would be a list or dict
of
     instances of the other table's model.
@@ -280,8 +263,8 @@ def many_to_many(model_class,
     this_column_name = '{0}_id'.format(this_table)
     this_foreign_key = '{0}.id'.format(this_table)
 
-    other_column_name = '{0}_id'.format(other_table)
-    other_foreign_key = '{0}.id'.format(other_table)
+    other_column_name = '{0}_{1}'.format(other_table, 'self_ref_id' if self else 'id')
+    other_foreign_key = '{0}.{1}'.format(other_table, 'id')
 
     secondary_table_name = '{0}_{1}'.format(this_table, other_table)
 
@@ -299,13 +282,20 @@ def many_to_many(model_class,
         other_foreign_key
     )
 
-    return _relationship(
-        model_class,
-        other_table,
-        relationship_kwargs={'secondary': secondary_table},
-        backref_kwargs={'name': other_property, 'uselist': True} if other_property else None,
-        dict_key=dict_key
-    )
+    kwargs = {'relationship_kwargs': {'secondary': secondary_table}}
+
+    if self:
+        kwargs['back_populates'] = NO_BACK_POP
+        kwargs['relationship_kwargs']['primaryjoin'] = \
+                    getattr(model_class, 'id') == getattr(secondary_table.c, this_column_name)
+        kwargs['relationship_kwargs']['secondaryjoin'] = \
+            getattr(model_class, 'id') == getattr(secondary_table.c, other_column_name)
+    else:
+        kwargs['backref_kwargs'] = \
+            {'name': other_property, 'uselist': True} if other_property else None
+        kwargs['dict_key'] = dict_key
+
+    return _relationship(model_class, other_table, **kwargs)
 
 
 def _relationship(model_class,
@@ -368,14 +358,6 @@ def _get_secondary_table(metadata,
     return Table(
         name,
         metadata,
-        Column(
-            first_column,
-            Integer,
-            ForeignKey(first_foreign_key)
-        ),
-        Column(
-            second_column,
-            Integer,
-            ForeignKey(second_foreign_key)
-        )
+        Column(first_column, Integer, ForeignKey(first_foreign_key)),
+        Column(second_column, Integer, ForeignKey(second_foreign_key))
     )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/service_common.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_common.py b/aria/modeling/service_common.py
index 272dfd7..0bb861f 100644
--- a/aria/modeling/service_common.py
+++ b/aria/modeling/service_common.py
@@ -320,7 +320,7 @@ class TypeBase(InstanceModelMixin):
 
     @declared_attr
     def children(cls):
-        return relationship.one_to_many_self(cls, 'parent_type_fk')
+        return relationship.one_to_many(cls, other_fk='parent_type_fk', self=True)
 
     # region foreign keys
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/service_instance.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_instance.py b/aria/modeling/service_instance.py
index 2bf9872..b451889 100644
--- a/aria/modeling/service_instance.py
+++ b/aria/modeling/service_instance.py
@@ -483,7 +483,7 @@ class NodeBase(InstanceModelMixin):
     @declared_attr
     def outbound_relationships(cls):
         return relationship.one_to_many(
-            cls, 'relationship', child_fk='source_node_fk', back_populates='source_node',
+            cls, 'relationship', other_fk='source_node_fk', back_populates='source_node',
             rel_kwargs=dict(
                 order_by='Relationship.source_position',
                 collection_class=ordering_list('source_position', count_from=0)
@@ -493,7 +493,7 @@ class NodeBase(InstanceModelMixin):
     @declared_attr
     def inbound_relationships(cls):
         return relationship.one_to_many(
-            cls, 'relationship', child_fk='target_node_fk', back_populates='target_node',
+            cls, 'relationship', other_fk='target_node_fk', back_populates='target_node',
             rel_kwargs=dict(
                 order_by='Relationship.target_position',
                 collection_class=ordering_list('target_position', count_from=0)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/modeling/service_template.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py
index 4d1e837..344da6d 100644
--- a/aria/modeling/service_template.py
+++ b/aria/modeling/service_template.py
@@ -493,7 +493,7 @@ class NodeTemplateBase(TemplateModelMixin):
 
     @declared_attr
     def requirement_templates(cls):
-        return relationship.one_to_many(cls, 'requirement_template', child_fk='node_template_fk')
+        return relationship.one_to_many(cls, 'requirement_template', other_fk='node_template_fk')
 
     @declared_attr
     def properties(cls):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 4a50fb2..3d58386 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -97,8 +97,8 @@ class WorkflowRunner(object):
         if not self._is_resume:
             workflow_fn = self._get_workflow_fn()
             self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
-            graph_compiler.GraphCompiler(self._workflow_context, executor.__class__).compile(
-                self._tasks_graph)
+            compiler = graph_compiler.GraphCompiler(self._workflow_context, executor.__class__)
+            compiler.compile(self._tasks_graph)
 
         self._engine = engine.Engine(executors={executor.__class__: executor})
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/aria/orchestrator/workflows/core/graph_compiler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/graph_compiler.py b/aria/orchestrator/workflows/core/graph_compiler.py
index f339038..81543d5 100644
--- a/aria/orchestrator/workflows/core/graph_compiler.py
+++ b/aria/orchestrator/workflows/core/graph_compiler.py
@@ -37,7 +37,6 @@ class GraphCompiler(object):
         :param end_stub_type: internal use
         :param depends_on: internal use
         """
-        task_graph = task_graph or self._task_graph
         depends_on = list(depends_on)
 
         # Insert start marker
@@ -110,8 +109,7 @@ class GraphCompiler(object):
         """
         tasks = []
         for dependency in dependencies:
-            if getattr(dependency, 'actor', False):
-                # This is
+            if isinstance(dependency, (api.task.StubTask, api.task.OperationTask)):
                 dependency_name = dependency.id
             else:
                 dependency_name = self._end_graph_suffix(dependency.id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 103596b..e640c7d 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -350,11 +350,11 @@ class TestResumableWorkflows(object):
         if events['execution_ended'].wait(60) is False:
             raise TimeoutError("Execution did not end")
 
-        first_task, second_task = workflow_context.model.task.list(filters={'_stub_type':
None})
-        assert first_task.status == first_task.SUCCESS
-        assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        assert any(task.status == task.SUCCESS for task in tasks)
+        assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)
         events['is_resumed'].set()
-        assert second_task.status in (second_task.FAILED, second_task.RETRYING)
+        assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)
 
         # Create a new workflow runner, with an existing execution id. This would cause
         # the old execution to restart.
@@ -370,7 +370,7 @@ class TestResumableWorkflows(object):
         new_wf_runner.execute()
 
         # Wait for it to finish and assert changes.
-        assert second_task.status == second_task.SUCCESS
+        assert all(task.status == task.SUCCESS for task in tasks)
         assert node.attributes['invocations'].value == 3
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index f0d2b26..e24c901 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -26,15 +26,17 @@ from tests import storage
 
 def test_task_graph_into_execution_graph(tmpdir):
     interface_name = 'Standard'
-    operation_name = 'create'
+    op1_name, op2_name, op3_name = 'create', 'configure', 'start'
     workflow_context = mock.context.simple(str(tmpdir))
     node = workflow_context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
     interface = mock.models.create_interface(
         node.service,
         interface_name,
-        operation_name,
+        op1_name,
         operation_kwargs=dict(function='test')
     )
+    interface.operations[op2_name] = mock.models.create_operation(op2_name)             
           # pylint: disable=unsubscriptable-object
+    interface.operations[op3_name] = mock.models.create_operation(op3_name)             
           # pylint: disable=unsubscriptable-object
     node.interfaces[interface.name] = interface
     workflow_context.model.node.update(node)
 
@@ -46,18 +48,31 @@ def test_task_graph_into_execution_graph(tmpdir):
         simple_before_task = api.task.OperationTask(
             node,
             interface_name=interface_name,
-            operation_name=operation_name)
+            operation_name=op1_name)
         simple_after_task = api.task.OperationTask(
             node,
             interface_name=interface_name,
-            operation_name=operation_name)
+            operation_name=op1_name)
 
         inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph')
-        inner_task = api.task.OperationTask(
+        inner_task_1 = api.task.OperationTask(
             node,
             interface_name=interface_name,
-            operation_name=operation_name)
-        inner_task_graph.add_tasks(inner_task)
+            operation_name=op1_name)
+        inner_task_2 = api.task.OperationTask(
+            node,
+            interface_name=interface_name,
+            operation_name=op2_name)
+        inner_task_3 = api.task.OperationTask(
+            node,
+            interface_name=interface_name,
+            operation_name=op3_name)
+        inner_task_graph.add_tasks(inner_task_1)
+        inner_task_graph.add_tasks(inner_task_2)
+        inner_task_graph.add_tasks(inner_task_3)
+        inner_task_graph.add_dependency(inner_task_2, inner_task_1)
+        inner_task_graph.add_dependency(inner_task_3, inner_task_1)
+        inner_task_graph.add_dependency(inner_task_3, inner_task_2)
 
     test_task_graph.add_tasks(simple_before_task)
     test_task_graph.add_tasks(simple_after_task)
@@ -70,13 +85,15 @@ def test_task_graph_into_execution_graph(tmpdir):
 
     execution_tasks = topological_sort(_graph(workflow_context.execution.tasks))
 
-    assert len(execution_tasks) == 7
+    assert len(execution_tasks) == 9
 
     expected_tasks_names = [
         '{0}-Start'.format(test_task_graph.id),
         simple_before_task.id,
         '{0}-Start'.format(inner_task_graph.id),
-        inner_task.id,
+        inner_task_1.id,
+        inner_task_2.id,
+        inner_task_3.id,
         '{0}-End'.format(inner_task_graph.id),
         simple_after_task.id,
         '{0}-End'.format(test_task_graph.id)
@@ -86,17 +103,55 @@ def test_task_graph_into_execution_graph(tmpdir):
     assert all(isinstance(task, models.Task) for task in execution_tasks)
     execution_tasks = iter(execution_tasks)
 
-    assert next(execution_tasks)._stub_type == models.Task.START_WORKFLOW
-    _assert_execution_is_api_task(next(execution_tasks), simple_before_task)
-    assert next(execution_tasks)._stub_type == models.Task.START_SUBWROFKLOW
-    _assert_execution_is_api_task(next(execution_tasks), inner_task)
-    assert next(execution_tasks)._stub_type == models.Task.END_SUBWORKFLOW
-    _assert_execution_is_api_task(next(execution_tasks), simple_after_task)
-    assert next(execution_tasks)._stub_type == models.Task.END_WORKFLOW
-
+    _assert_tasks(
+        iter(execution_tasks),
+        iter([simple_after_task, inner_task_1, inner_task_2, inner_task_3, simple_after_task])
+    )
     storage.release_sqlite_storage(workflow_context.model)
 
 
+def _assert_tasks(execution_tasks, api_tasks):
+    start_workflow_exec_task = next(execution_tasks)
+    assert start_workflow_exec_task._stub_type == models.Task.START_WORKFLOW
+
+    before_exec_task = next(execution_tasks)
+    simple_before_task = next(api_tasks)
+    _assert_execution_is_api_task(before_exec_task, simple_before_task)
+    assert before_exec_task.dependencies == [start_workflow_exec_task]
+
+    start_subworkflow_exec_task = next(execution_tasks)
+    assert start_subworkflow_exec_task._stub_type == models.Task.START_SUBWROFKLOW
+    assert start_subworkflow_exec_task.dependencies == [before_exec_task]
+
+    inner_exec_task_1 = next(execution_tasks)
+    inner_task_1 = next(api_tasks)
+    _assert_execution_is_api_task(inner_exec_task_1, inner_task_1)
+    assert inner_exec_task_1.dependencies == [start_subworkflow_exec_task]
+
+    inner_exec_task_2 = next(execution_tasks)
+    inner_task_2 = next(api_tasks)
+    _assert_execution_is_api_task(inner_exec_task_2, inner_task_2)
+    assert inner_exec_task_2.dependencies == [inner_exec_task_1]
+
+    inner_exec_task_3 = next(execution_tasks)
+    inner_task_3 = next(api_tasks)
+    _assert_execution_is_api_task(inner_exec_task_3, inner_task_3)
+    assert sorted(inner_exec_task_3.dependencies) == sorted([inner_exec_task_1, inner_exec_task_2])
+
+    end_subworkflow_exec_task = next(execution_tasks)
+    assert end_subworkflow_exec_task._stub_type == models.Task.END_SUBWORKFLOW
+    assert end_subworkflow_exec_task.dependencies == [inner_exec_task_3]
+
+    after_exec_task = next(execution_tasks)
+    simple_after_task = next(api_tasks)
+    _assert_execution_is_api_task(after_exec_task, simple_after_task)
+    assert after_exec_task.dependencies == [end_subworkflow_exec_task]
+
+    end_workflow_exec_task = next(execution_tasks)
+    assert end_workflow_exec_task._stub_type == models.Task.END_WORKFLOW
+    assert end_workflow_exec_task.dependencies == [after_exec_task]
+
+
 def _assert_execution_is_api_task(execution_task, api_task):
     assert execution_task.name == api_task.name
     assert execution_task.function == api_task.function

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62e67c1c/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 6f5c827..6cac288 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -99,7 +99,7 @@ class TestProcessExecutor(object):
         executor.terminate(ctx.task.id)
 
         # Give a chance to the processes to terminate
-        time.sleep(2)
+        time.sleep(10) # windows might require more time
         assert not any(p.pid == pid and p.status() != psutil.STATUS_ZOMBIE
                        for p in psutil.process_iter()
                        for pid in pids)


Mime
View raw message