ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject [08/11] incubator-ariatosca git commit: bug fixes
Date Tue, 08 Nov 2016 12:29:16 GMT
bug fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/b113b5e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/b113b5e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/b113b5e3

Branch: refs/heads/ARIA-10-task-retries
Commit: b113b5e3550b2b93b037d9b281d3797aae31d539
Parents: de18726
Author: Dan Kilman <dank@gigaspaces.com>
Authored: Tue Nov 8 12:53:40 2016 +0200
Committer: Dan Kilman <dank@gigaspaces.com>
Committed: Tue Nov 8 12:53:40 2016 +0200

----------------------------------------------------------------------
 aria/workflows/core/engine.py             |  2 +-
 aria/workflows/core/task.py               | 14 +++++++-------
 aria/workflows/core/translation.py        | 22 +++++++++++++---------
 tests/workflows/core/test_engine.py       | 20 ++++++++++++++++++++
 tests/workflows/executor/test_executor.py |  4 +++-
 5 files changed, 44 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b113b5e3/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index 1ecff54..65e17c3 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -82,7 +82,7 @@ class Engine(logger.LoggerMixin):
         return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True))
 
     def _handle_executable_task(self, task):
-        if isinstance(task, engine_task.BaseWorkflowTask):
+        if isinstance(task, engine_task.StubTask):
             task.status = models.Task.SUCCESS
         else:
             events.sent_task_signal.send(task)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b113b5e3/aria/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/task.py b/aria/workflows/core/task.py
index 2574c4b..1cded79 100644
--- a/aria/workflows/core/task.py
+++ b/aria/workflows/core/task.py
@@ -41,39 +41,39 @@ class BaseTask(logger.LoggerMixin):
         return self._id
 
 
-class BaseWorkflowTask(BaseTask):
+class StubTask(BaseTask):
     """
-    Base class for all workflow wrapping tasks
+    Base stub task for all tasks that don't actually run anything
     """
 
     def __init__(self, *args, **kwargs):
-        super(BaseWorkflowTask, self).__init__(*args, **kwargs)
+        super(StubTask, self).__init__(*args, **kwargs)
         self.status = models.Task.PENDING
         self.eta = datetime.utcnow()
 
 
-class StartWorkflowTask(BaseWorkflowTask):
+class StartWorkflowTask(StubTask):
     """
     Tasks marking a workflow start
     """
     pass
 
 
-class EndWorkflowTask(BaseWorkflowTask):
+class EndWorkflowTask(StubTask):
     """
     Tasks marking a workflow end
     """
     pass
 
 
-class StartSubWorkflowTask(BaseWorkflowTask):
+class StartSubWorkflowTask(StubTask):
     """
     Tasks marking a subworkflow start
     """
     pass
 
 
-class EndSubWorkflowTask(BaseWorkflowTask):
+class EndSubWorkflowTask(StubTask):
     """
     Tasks marking a subworkflow end
     """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b113b5e3/aria/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/translation.py b/aria/workflows/core/translation.py
index cd9a0e6..a9e411a 100644
--- a/aria/workflows/core/translation.py
+++ b/aria/workflows/core/translation.py
@@ -17,8 +17,9 @@
 Translation of user graph's API to the execution graph
 """
 
-from . import task as core_task
 from .. import api
+from .. import exceptions
+from . import task as core_task
 
 
 def build_execution_graph(
@@ -47,12 +48,12 @@ def build_execution_graph(
             dependencies,
             default=[start_task])
 
-        if _is_operation(api_task):
+        if isinstance(api_task, api.task.OperationTask):
             # Add the task an the dependencies
             operation_task = core_task.OperationTask(api_task)
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
-        else:
-            # Built the graph recursively while adding start and end markers
+        elif isinstance(api_task, api.task.WorkflowTask):
+            # Build the graph recursively while adding start and end markers
             build_execution_graph(
                 task_graph=api_task,
                 execution_graph=execution_graph,
@@ -60,6 +61,11 @@ def build_execution_graph(
                 end_cls=core_task.EndSubWorkflowTask,
                 depends_on=operation_dependencies
             )
+        elif isinstance(api_task, api.task.StubTask):
+            stub_task = core_task.StubTask(id=api_task.id)
+            _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies)
+        else:
+            raise RuntimeError('Undefined state')
 
     # Insert end marker
     workflow_dependencies = _get_tasks_from_dependencies(
@@ -80,15 +86,13 @@ def _get_tasks_from_dependencies(execution_graph, dependencies, default=()):
     """
     Returns task list from dependencies.
     """
-    return [execution_graph.node[dependency.id if _is_operation(dependency)
+    return [execution_graph.node[dependency.id
+                                 if isinstance(dependency, (api.task.OperationTask,
+                                                            api.task.StubTask))
                                  else _end_graph_suffix(dependency.id)]['task']
             for dependency in dependencies] or default
 
 
-def _is_operation(task):
-    return isinstance(task, api.task.OperationTask)
-
-
 def _start_graph_suffix(id):
     return '{0}-Start'.format(id)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b113b5e3/tests/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py
index d9abc14..8380470 100644
--- a/tests/workflows/core/test_engine.py
+++ b/tests/workflows/core/test_engine.py
@@ -177,6 +177,26 @@ class TestEngine(BaseTest):
         assert global_test_holder.get('invocations') == [1, 2]
         assert global_test_holder.get('sent_task_signal_calls') == 2
 
+    def test_stub_and_subworkflow_execution(self, workflow_context, executor):
+        @workflow
+        def sub_workflow(ctx, graph):
+            op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
+            op2 = api.task.StubTask()
+            graph.sequence(op1, op2)
+
+        @workflow
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(api.task.WorkflowTask(sub_workflow))
+
+        self._execute(workflow_func=mock_workflow,
+                      workflow_context=workflow_context,
+                      executor=executor)
+
+        assert workflow_context.states == ['start', 'success']
+        assert workflow_context.exception is None
+        assert global_test_holder.get('invocations') == [1]
+        assert global_test_holder.get('sent_task_signal_calls') == 1
+
 
 def mock_success_task():
     pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/b113b5e3/tests/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/executor/test_executor.py b/tests/workflows/executor/test_executor.py
index 8ec0303..bbbfbc7 100644
--- a/tests/workflows/executor/test_executor.py
+++ b/tests/workflows/executor/test_executor.py
@@ -109,11 +109,13 @@ class MockTask(object):
         self.exception = None
         self.id = str(uuid.uuid4())
         name = func.__name__
-        operation = 'tests.workflows.core.test_executor.{name}'.format(name=name)
+        operation = 'tests.workflows.executor.test_executor.{name}'.format(name=name)
         self.operation_details = {'operation': operation}
         self.logger = logging.getLogger()
         self.name = name
         self.inputs = inputs or {}
+        self.retry_count = 0
+        self.max_retries = 0
 
         for state in models.Task.STATES:
             setattr(self, state.upper(), state)


Mime
View raw message