ariatosca-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [12/12] incubator-ariatosca git commit: wip
Date Sun, 02 Jul 2017 18:44:33 GMT
wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/691334ef
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/691334ef
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/691334ef

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: 691334ef97606b943c98002b42833529cda2245b
Parents: 3583f8c
Author: max-orlov <maxim@gigaspaces.com>
Authored: Sun Jul 2 21:43:43 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Sun Jul 2 21:44:14 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py             |   4 +-
 aria/orchestrator/workflow_runner.py       |   7 +-
 aria/orchestrator/workflows/core/engine.py |  28 ++++--
 tests/orchestrator/test_workflow_runner.py | 121 ++++++++++++++++++++----
 4 files changed, 134 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/691334ef/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 7068557..4d7a5b5 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -65,7 +65,9 @@ class ExecutionBase(mixins.ModelMixin):
         PENDING: (STARTED, CANCELLED),
         STARTED: END_STATES + (CANCELLING,),
         CANCELLING: END_STATES,
-        CANCELLED: PENDING
+        # Retrying
+        CANCELLED: PENDING,
+        FAILED: PENDING
     }
 
     # region one_to_many relationships

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/691334ef/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index df1725f..b7de7b5 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -38,7 +38,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
 class WorkflowRunner(object):
 
     def __init__(self, model_storage, resource_storage, plugin_manager,
-                 execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None,
+                 execution_id=None, retry_failed=False,
+                 service_id=None, workflow_name=None, inputs=None, executor=None,
                  task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
         """
@@ -62,6 +63,7 @@ class WorkflowRunner(object):
                 "and service id with inputs")
 
         self._is_resume = execution_id is not None
+        self._retry_failed = retry_failed
 
         self._model_storage = model_storage
         self._resource_storage = resource_storage
@@ -116,7 +118,8 @@ class WorkflowRunner(object):
         return self._model_storage.service.get(self._service_id)
 
     def execute(self):
-        self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume)
+        self._engine.execute(
+            ctx=self._workflow_context, resuming=self._is_resume, retry_failing=self._retry_failed)
 
     def cancel(self):
         self._engine.cancel_execution(ctx=self._workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/691334ef/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d9c77e9..151c3ff 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,14 +41,27 @@ class Engine(logger.LoggerMixin):
         self._executors = executors.copy()
         self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
-    def execute(self, ctx, resuming=False):
+    def execute(self, ctx, resuming=False, retry_failing=False):
         """
         Executes the workflow.
         """
         if resuming:
             events.on_resume_workflow_signal.send(ctx)
 
-        tasks_tracker = _TasksTracker(ctx)
+        tasks_tracker = _TasksTracker(
+            ctx,
+            # only succeeded tasks have ended (for retry failing)
+            has_task_ended_func=
+            lambda t: (t.status == t.SUCCESS or (t.status == t.FAILED and t.ignore_failure))
+            if retry_failing else
+            None,
+            # failed tasks are waiting to be executed (for retry failing)
+            is_task_waiting_func=
+            lambda t: (t.is_waiting() or (t.FAILED and not t.ignore_failure))
+            if retry_failing else
+            None
+        )
+
         try:
             events.start_workflow_signal.send(ctx)
             while True:
@@ -124,10 +137,13 @@ class Engine(logger.LoggerMixin):
 
 
 class _TasksTracker(object):
-    def __init__(self, ctx):
+    def __init__(self, ctx, has_task_ended_func=None, is_task_waiting_func=None):
         self._ctx = ctx
+
+        has_task_ended = has_task_ended_func or (lambda t: t.has_ended())
+        self._is_task_waiting = is_task_waiting_func or (lambda t: t.is_waiting())
         self._tasks = ctx.execution.tasks
-        self._executed_tasks = [task for task in self._tasks if task.has_ended()]
+        self._executed_tasks = [task for task in self._tasks if has_task_ended(task)]
         self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
         self._executing_tasks = []
 
@@ -155,8 +171,8 @@ class _TasksTracker(object):
     def executable_tasks(self):
         now = datetime.utcnow()
         # we need both lists since retrying task are in the executing task list.
-        for task in self._update_tasks(self._executing_tasks + self._executable_tasks):
-            if all([task.is_waiting(),
+        for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
+            if all([self._is_task_waiting(task),
                     task.due_at <= now,
                     all(dependency in self._executed_tasks for dependency in task.dependencies)
                    ]):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/691334ef/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index e640c7d..adeb274 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -23,7 +23,7 @@ import pytest
 from aria.modeling import exceptions as modeling_exceptions
 from aria.modeling import models
 from aria.orchestrator import exceptions
-from aria.orchestrator.events import on_cancelled_workflow_signal
+from aria.orchestrator import events
 from aria.orchestrator.workflow_runner import WorkflowRunner
 from aria.orchestrator.workflows.executor.process import ProcessExecutor
 from aria.orchestrator.workflows import api
@@ -46,9 +46,10 @@ from ..fixtures import (  # pylint: disable=unused-import
     resource_storage as resource
 )
 
-events = {
+custom_events = {
     'is_resumed': Event(),
     'is_active': Event(),
+    'execution_cancelled': Event(),
     'execution_ended': Event()
 }
 
@@ -318,42 +319,53 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
 
 class TestResumableWorkflows(object):
 
-    def test_resume_workflow(self, workflow_context, executor):
-        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
-        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
-        self._create_interface(workflow_context, node, mock_resuming_task)
+    def _create_initial_workflow_runner(
+            self, workflow_context, workflow, executor, inputs=None):
 
         service = workflow_context.service
         service.workflows['custom_workflow'] = tests_mock.models.create_operation(
             'custom_workflow',
-            operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
+            operation_kwargs={
+                'function': '{0}.{1}'.format(__name__, workflow.__name__),
+                'inputs': dict((k, models.Input.wrap(k, v)) for k, v in (inputs or {}).items())
+            }
         )
         workflow_context.model.service.update(service)
 
         wf_runner = WorkflowRunner(
             service_id=workflow_context.service.id,
-            inputs={},
+            inputs=inputs or {},
             model_storage=workflow_context.model,
             resource_storage=workflow_context.resource,
             plugin_manager=None,
             workflow_name='custom_workflow',
             executor=executor)
+        return wf_runner
+
+    def test_resume_workflow(self, workflow_context, executor):
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_resuming_task)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context, mock_parallel_workflow, executor)
+
         wf_thread = Thread(target=wf_runner.execute)
         wf_thread.daemon = True
         wf_thread.start()
 
         # Wait for the execution to start
-        if events['is_active'].wait(5) is False:
+        if custom_events['is_active'].wait(5) is False:
             raise TimeoutError("is_active wasn't set to True")
         wf_runner.cancel()
 
-        if events['execution_ended'].wait(60) is False:
+        if custom_events['execution_cancelled'].wait(60) is False:
             raise TimeoutError("Execution did not end")
 
         tasks = workflow_context.model.task.list(filters={'_stub_type': None})
         assert any(task.status == task.SUCCESS for task in tasks)
         assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)
-        events['is_resumed'].set()
+        custom_events['is_resumed'].set()
         assert any(task.status in (task.FAILED, task.RETRYING) for task in tasks)
 
         # Create a new workflow runner, with an existing execution id. This would cause
@@ -374,6 +386,49 @@ class TestResumableWorkflows(object):
         assert node.attributes['invocations'].value == 3
         assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
 
+    def test_resume_failed_workflow(self, workflow_context, executor):
+
+        node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        node.attributes['invocations'] = models.Attribute.wrap('invocations', 0)
+        self._create_interface(workflow_context, node, mock_failed_first_task)
+
+        wf_runner = self._create_initial_workflow_runner(
+            workflow_context, mock_sequential_workflow, executor)
+        wf_thread = Thread(target=wf_runner.execute)
+        wf_thread.daemon = True
+        wf_thread.start()
+
+        if custom_events['execution_ended'].wait(60) is False:
+            raise TimeoutError("Execution did not end")
+
+        assert node.attributes['invocations'].value == 1
+        tasks = workflow_context.model.task.list(filters={'_stub_type': None})
+        assert any(task.status == task.SUCCESS for task in tasks)
+        assert any(task.status == task.FAILED for task in tasks)
+        assert wf_runner.execution.status == wf_runner.execution.FAILED
+
+        custom_events['is_resumed'].set()
+
+        # Create a new workflow runner, with an existing execution id. This would cause
+        # the old execution to restart.
+        new_wf_runner = WorkflowRunner(
+            service_id=wf_runner.service.id,
+            retry_failed=True,
+            inputs={},
+            model_storage=workflow_context.model,
+            resource_storage=workflow_context.resource,
+            plugin_manager=None,
+            execution_id=wf_runner.execution.id,
+            executor=executor)
+
+        new_wf_runner.execute()
+
+        # Wait for it to finish and assert changes.
+        print node.attributes['triggerer'].value
+        assert node.attributes['invocations'].value == 2
+        assert all(task.status == task.SUCCESS for task in tasks)
+        assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
     @staticmethod
     @pytest.fixture
     def executor():
@@ -417,16 +472,23 @@ class TestResumableWorkflows(object):
 
     @pytest.fixture(autouse=True)
     def register_to_events(self):
+        def execution_cancelled(*args, **kwargs):
+            custom_events['execution_cancelled'].set()
+
         def execution_ended(*args, **kwargs):
-            events['execution_ended'].set()
+            custom_events['execution_ended'].set()
 
-        on_cancelled_workflow_signal.connect(execution_ended)
+        events.on_cancelled_workflow_signal.connect(execution_cancelled)
+        events.on_failure_workflow_signal.connect(execution_ended)
         yield
-        on_cancelled_workflow_signal.disconnect(execution_ended)
+        events.on_cancelled_workflow_signal.disconnect(execution_cancelled)
+        events.on_failure_workflow_signal.disconnect(execution_ended)
+        for event in custom_events.values():
+            event.clear()
 
 
 @workflow
-def mock_workflow(ctx, graph):
+def mock_parallel_workflow(ctx, graph):
     node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
     graph.add_tasks(
         api.task.OperationTask(
@@ -441,8 +503,33 @@ def mock_resuming_task(ctx):
     ctx.node.attributes['invocations'] += 1
 
     if ctx.node.attributes['invocations'] != 1:
-        events['is_active'].set()
-        if not events['is_resumed'].isSet():
+        custom_events['is_active'].set()
+        if not custom_events['is_resumed'].isSet():
             # if resume was called, increase by one. o/w fail the execution - second task
should
             # fail as long it was not a part of resuming the workflow
             raise BaseException("wasn't resumed yet")
+
+
+@workflow
+def mock_sequential_workflow(ctx, graph):
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.sequence(
+        api.task.OperationTask(node,
+                               interface_name='aria.interfaces.lifecycle',
+                               operation_name='create'),
+        api.task.OperationTask(node,
+                               interface_name='aria.interfaces.lifecycle',
+                               operation_name='create')
+    )
+
+
+@operation
+def mock_failed_first_task(ctx):
+    if not custom_events['is_resumed'].isSet():
+        if ctx.node.attributes['invocations'] == 1:
+            # Second task should fail (an be retried later)
+            ctx.task.abort()
+    ctx.node.attributes['invocations'] += 1
+    if 'triggerer' not in ctx.node.attributes:
+        ctx.node.attributes['triggerer'] = []
+    ctx.node.attributes['triggerer'].append('{0}:{1}'.format(ctx.task.id, ctx.node.attributes['invocations']))


Mime
View raw message