ariatosca-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: wip
Date Tue, 04 Jul 2017 15:44:11 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 691334ef9 -> cd91d0124


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/cd91d012
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/cd91d012
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/cd91d012

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: cd91d0124d3c51dafaa394a404150a52d9b7b22f
Parents: 691334e
Author: max-orlov <maxim@gigaspaces.com>
Authored: Tue Jul 4 18:43:35 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Tue Jul 4 18:43:35 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflows/core/engine.py | 10 ++---
 tests/orchestrator/test_workflow_runner.py | 53 ++++++++++++++-----------
 2 files changed, 35 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/cd91d012/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 151c3ff..cb98b82 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -52,12 +52,12 @@ class Engine(logger.LoggerMixin):
             ctx,
             # only succeeded tasks have ended (for retry failing)
             has_task_ended_func=
-            lambda t: (t.status == t.SUCCESS or (t.status == t.FAILED and t.ignore_failure))
+            (lambda t: (t.status == t.SUCCESS or (t.status == t.FAILED and t.ignore_failure)))
             if retry_failing else
             None,
             # failed tasks are waiting to be executed (for retry failing)
             is_task_waiting_func=
-            lambda t: (t.is_waiting() or (t.FAILED and not t.ignore_failure))
+            (lambda t: t.is_waiting())
             if retry_failing else
             None
         )
@@ -140,10 +140,10 @@ class _TasksTracker(object):
     def __init__(self, ctx, has_task_ended_func=None, is_task_waiting_func=None):
         self._ctx = ctx
 
-        has_task_ended = has_task_ended_func or (lambda t: t.has_ended())
+        self._has_task_ended = has_task_ended_func or (lambda t: t.has_ended())
         self._is_task_waiting = is_task_waiting_func or (lambda t: t.is_waiting())
         self._tasks = ctx.execution.tasks
-        self._executed_tasks = [task for task in self._tasks if has_task_ended(task)]
+        self._executed_tasks = [task for task in self._tasks if self._has_task_ended(task)]
         self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
         self._executing_tasks = []
 
@@ -164,7 +164,7 @@ class _TasksTracker(object):
     @property
     def ended_tasks(self):
         for task in self.executing_tasks:
-            if task.has_ended():
+            if self._has_task_ended(task):
                 yield task
 
     @property

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/cd91d012/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index adeb274..d3fdeae 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import json
+import time
 from threading import Thread, Event
 from datetime import datetime
 
@@ -389,7 +390,6 @@ class TestResumableWorkflows(object):
     def test_resume_failed_workflow(self, workflow_context, executor):
 
         node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
         self._create_interface(workflow_context, node, mock_failed_first_task)
 
         wf_runner = self._create_initial_workflow_runner(
@@ -398,14 +398,17 @@ class TestResumableWorkflows(object):
         wf_thread.daemon = True
         wf_thread.start()
 
-        if custom_events['execution_ended'].wait(60) is False:
+        if custom_events['is_active'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+        wf_runner.cancel()
+        if custom_events['execution_cancelled'].wait(60) is False:
             raise TimeoutError("Execution did not end")
 
-        assert node.attributes['invocations'].value == 1
-        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
-        assert any(task.status == task.SUCCESS for task in tasks)
-        assert any(task.status == task.FAILED for task in tasks)
-        assert wf_runner.execution.status == wf_runner.execution.FAILED
+        task = workflow_context.model.task.list(filters={'_stub_type': None})[0]
+        assert task.attempts_count == 2
+        assert task.status in (task.STARTED, task.RETRYING)
+        assert wf_runner.execution.status in (wf_runner.execution.CANCELLED,
+                                              wf_runner.execution.CANCELLING)
 
         custom_events['is_resumed'].set()
 
@@ -424,10 +427,9 @@ class TestResumableWorkflows(object):
         new_wf_runner.execute()
 
         # Wait for it to finish and assert changes.
-        print node.attributes['triggerer'].value
-        assert node.attributes['invocations'].value == 2
-        assert all(task.status == task.SUCCESS for task in tasks)
-        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+        assert node.attributes['invocations'].value == workflow_context._task_max_attempts
+        assert task.status == task.FAILED
+        assert wf_runner.execution.status == wf_runner.execution.FAILED
 
     @staticmethod
     @pytest.fixture
@@ -516,20 +518,25 @@ def mock_sequential_workflow(ctx, graph):
     graph.sequence(
         api.task.OperationTask(node,
                                interface_name='aria.interfaces.lifecycle',
-                               operation_name='create'),
-        api.task.OperationTask(node,
-                               interface_name='aria.interfaces.lifecycle',
-                               operation_name='create')
+                               operation_name='create',
+                               retry_interval=1,
+                               max_attempts=5),
     )
 
 
 @operation
 def mock_failed_first_task(ctx):
-    if not custom_events['is_resumed'].isSet():
-        if ctx.node.attributes['invocations'] == 1:
-            # Second task should fail (an be retried later)
-            ctx.task.abort()
-    ctx.node.attributes['invocations'] += 1
-    if 'triggerer' not in ctx.node.attributes:
-        ctx.node.attributes['triggerer'] = []
-    ctx.node.attributes['triggerer'].append('{0}:{1}'.format(ctx.task.id, ctx.node.attributes['invocations']))
+    """
+    The task runs for 10 times. then it sleeps waiting for cancellation. upon resume
+    :param ctx: 
+    :return: 
+    """
+
+    if ctx.task.attempts_count == 2:
+        custom_events['is_active'].set()
+        while True:
+            pass
+
+    if not custom_events['is_resumed'].set():
+        raise BaseException("stop this task")
+


Mime
View raw message