ariatosca-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: review 1 fixes
Date Tue, 11 Jul 2017 13:49:20 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 123a55ce7 -> 79dc7e75e


review 1 fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/79dc7e75
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/79dc7e75
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/79dc7e75

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: 79dc7e75ec46bd036485431957548b1c3b06dfed
Parents: 123a55c
Author: max-orlov <maxim@gigaspaces.com>
Authored: Tue Jul 11 16:49:15 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Tue Jul 11 16:49:15 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflow_runner.py            |  9 +-
 aria/orchestrator/workflows/core/engine.py      |  9 +-
 .../workflows/core/events_handler.py            |  1 +
 tests/orchestrator/test_workflow_runner.py      | 89 ++++++++++++++++++--
 4 files changed, 87 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79dc7e75/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 2bd3043..a85e7d3 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -38,7 +38,7 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
 class WorkflowRunner(object):
 
     def __init__(self, model_storage, resource_storage, plugin_manager,
-                 execution_id=None, retry_failed=False,
+                 execution_id=None, retry_failed_tasks=False,
                  service_id=None, workflow_name=None, inputs=None, executor=None,
                  task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
@@ -63,7 +63,7 @@ class WorkflowRunner(object):
                 "and service id with inputs")
 
         self._is_resume = execution_id is not None
-        self._retry_failed = retry_failed
+        self._retry_failed_tasks = retry_failed_tasks
 
         self._model_storage = model_storage
         self._resource_storage = resource_storage
@@ -118,8 +118,9 @@ class WorkflowRunner(object):
         return self._model_storage.service.get(self._service_id)
 
     def execute(self):
-        self._engine.execute(
-            ctx=self._workflow_context, resuming=self._is_resume, retry_failed=self._retry_failed)
+        self._engine.execute(ctx=self._workflow_context,
+                             resuming=self._is_resume,
+                             retry_failed=self._retry_failed_tasks)
 
     def cancel(self):
         self._engine.cancel_execution(ctx=self._workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79dc7e75/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 69505fc..0ec3cd8 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -69,15 +69,8 @@ class Engine(logger.LoggerMixin):
             if cancel:
                 self._terminate_tasks(tasks_tracker.executing_tasks)
                 events.on_cancelled_workflow_signal.send(ctx)
-            elif all(task.status == task.SUCCESS or task.ignore_failure
-                     for task in ctx.execution.tasks):
-                events.on_success_workflow_signal.send(ctx)
             else:
-                exception = "Tasks {tasks} remain failed".format(
-                    tasks=
-                    [t for t in ctx.execution.tasks if t.status == t.SUCCESS or t.ignore_failure]
-                )
-                events.on_failure_workflow_signal.send(ctx, exception=exception)
+                events.on_success_workflow_signal.send(ctx)
         except BaseException as e:
             # Cleanup any remaining tasks
             self._terminate_tasks(tasks_tracker.executing_tasks)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79dc7e75/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 5ac1ce8..219d2df 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -71,6 +71,7 @@ def _task_succeeded(ctx, *args, **kwargs):
     with ctx.persist_changes:
         ctx.task.ended_at = datetime.utcnow()
         ctx.task.status = ctx.task.SUCCESS
+        ctx.task.attempts_count += 1
 
         _update_node_state_if_necessary(ctx)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/79dc7e75/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index adb19e6..30176ae 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -359,7 +359,7 @@ class TestResumableWorkflows(object):
     def test_resume_workflow(self, workflow_context, thread_executor):
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_failed_task)
+        self._create_interface(workflow_context, node, mock_pass_first_task_only)
 
         wf_runner = self._create_initial_workflow_runner(
             workflow_context, mock_parallel_tasks_workflow, thread_executor,
@@ -491,13 +491,13 @@ class TestResumableWorkflows(object):
     def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor):
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
         node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_failed_task)
+        self._create_interface(workflow_context, node, mock_pass_first_task_only)
 
         wf_runner = self._create_initial_workflow_runner(
             workflow_context,
             mock_parallel_tasks_workflow,
             thread_executor,
-            inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
+            inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2}
         )
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.setDaemon(True)
@@ -508,10 +508,12 @@ class TestResumableWorkflows(object):
 
         tasks = workflow_context.model.task.list(filters={'_stub_type': None})
         node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 2
+        assert node.attributes['invocations'].value == 3
+        failed_task = [t for t in tasks if t.status == t.FAILED][0]
 
         # First task passes
         assert any(task.status == task.FAILED for task in tasks)
+        assert failed_task.attempts_count == 2
         # Second task fails
         assert any(task.status == task.SUCCESS for task in tasks)
         assert wf_runner.execution.status in wf_runner.execution.FAILED
@@ -521,7 +523,7 @@ class TestResumableWorkflows(object):
         try:
             new_wf_runner = WorkflowRunner(
                 service_id=wf_runner.service.id,
-                retry_failed=True,
+                retry_failed_tasks=True,
                 inputs={},
                 model_storage=workflow_context.model,
                 resource_storage=workflow_context.resource,
@@ -535,10 +537,60 @@ class TestResumableWorkflows(object):
 
         # Wait for it to finish and assert changes.
         node = workflow_context.model.node.refresh(node)
-        assert node.attributes['invocations'].value == 3
+        assert failed_task.attempts_count == 1
+        assert node.attributes['invocations'].value == 4
         assert all(task.status == task.SUCCESS for task in tasks)
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
+    def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_fail_first_task_only)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context,
+            mock_sequential_tasks_workflow,
+            thread_executor,
+            inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2}
+        )
+        wf_thread = Thread(target=wf_runner.execute)
+        wf_thread.setDaemon(True)
+        wf_thread.start()
+
+        if custom_events['execution_failed'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 1
+        assert any(t.status == t.FAILED for t in tasks)
+        assert any(t.status == t.PENDING for t in tasks)
+
+        custom_events['is_resumed'].set()
+        new_thread_executor = thread.ThreadExecutor()
+        try:
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=new_thread_executor)
+
+            new_wf_runner.execute()
+        finally:
+            new_thread_executor.close()
+
+        # Wait for it to finish and assert changes.
+        node = workflow_context.model.node.refresh(node)
+        assert node.attributes['invocations'].value == 2
+        assert any(t.status == t.SUCCESS for t in tasks)
+        assert any(t.status == t.FAILED for t in tasks)
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+
+
     @staticmethod
     @pytest.fixture
     def thread_executor():
@@ -598,10 +650,21 @@ class TestResumableWorkflows(object):
 
 
 @workflow
+def mock_sequential_tasks_workflow(ctx, graph,
+                                   retry_interval=1, max_attempts=10, number_of_tasks=1):
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
+
+
+@workflow
 def mock_parallel_tasks_workflow(ctx, graph,
                                  retry_interval=1, max_attempts=10, number_of_tasks=1):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-    tasks = [
+    graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks))
+
+
+def _create_tasks(node, retry_interval, max_attempts, number_of_tasks):
+    return [
         api.task.OperationTask(node,
                                'aria.interfaces.lifecycle',
                                'create',
@@ -609,7 +672,7 @@ def mock_parallel_tasks_workflow(ctx, graph,
                                max_attempts=max_attempts)
         for _ in xrange(number_of_tasks)
     ]
-    graph.add_tasks(*tasks)
+
 
 
 @operation
@@ -644,7 +707,7 @@ def mock_stuck_task(ctx):
 
 
 @operation
-def mock_failed_task(ctx):
+def mock_pass_first_task_only(ctx):
     ctx.node.attributes['invocations'] += 1
 
     if ctx.node.attributes['invocations'] != 1:
@@ -653,3 +716,11 @@ def mock_failed_task(ctx):
             # if resume was called, increase by one. o/w fail the execution - second task
should
             # fail as long it was not a part of resuming the workflow
             raise FailingTask("wasn't resumed yet")
+
+
+@operation
+def mock_fail_first_task_only(ctx):
+    ctx.node.attributes['invocations'] += 1
+
+    if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] ==
1:
+        raise FailingTask("First task should fail")


Mime
View raw message