ariatosca-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [6/6] incubator-ariatosca git commit: removed additional logic, and just reset the tasks in the event
Date Sun, 09 Jul 2017 11:05:45 GMT
removed additional logic, and just reset the tasks in the event


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/684b38fe
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/684b38fe
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/684b38fe

Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions
Commit: 684b38fe79adf3e5170a6c434598906f28d193dd
Parents: e7e1d89
Author: max-orlov <maxim@gigaspaces.com>
Authored: Wed Jul 5 17:22:00 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Sun Jul 9 14:05:24 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/workflow_runner.py            |  2 +-
 aria/orchestrator/workflows/core/engine.py      | 36 ++++----------------
 .../workflows/core/events_handler.py            |  9 ++++-
 3 files changed, 16 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/684b38fe/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index b7de7b5..5592326 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -119,7 +119,7 @@ class WorkflowRunner(object):
 
     def execute(self):
         self._engine.execute(
-            ctx=self._workflow_context, resuming=self._is_resume, retry_failing=self._retry_failed)
+            ctx=self._workflow_context, resuming=self._is_resume, retry_failed=self._retry_failed)
 
     def cancel(self):
         self._engine.cancel_execution(ctx=self._workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/684b38fe/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index d22b535..0ec3cd8 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -41,14 +41,14 @@ class Engine(logger.LoggerMixin):
         self._executors = executors.copy()
         self._executors.setdefault(StubTaskExecutor, StubTaskExecutor())
 
-    def execute(self, ctx, resuming=False, retry_failing=False):
+    def execute(self, ctx, resuming=False, retry_failed=False):
         """
         Executes the workflow.
         """
         if resuming:
-            events.on_resume_workflow_signal.send(ctx)
+            events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed)
 
-        tasks_tracker = _TasksTracker(ctx, retry_failing)
+        tasks_tracker = _TasksTracker(ctx)
 
         try:
             events.start_workflow_signal.send(ctx)
@@ -126,36 +126,14 @@ class Engine(logger.LoggerMixin):
 
 class _TasksTracker(object):
 
-    def __init__(self, ctx, retry_failing=False):
+    def __init__(self, ctx):
         self._ctx = ctx
 
-        if retry_failing:
-            self._has_task_ended = self._retry_failed_has_task_ended
-            self._is_task_waiting = self._retry_failed_is_task_waiting
-
         self._tasks = ctx.execution.tasks
-        self._executed_tasks = [task for task in self._tasks if self._has_task_ended(task)]
+        self._executed_tasks = [task for task in self._tasks if task.has_ended()]
         self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks))
         self._executing_tasks = []
 
-    @staticmethod
-    def _retry_failed_has_task_ended(task):
-        # only succeeded tasks have ended (for retry failing)
-        return task.status == task.SUCCESS or (task.status == task.FAILED and task.ignore_failure)
-
-    @staticmethod
-    def _retry_failed_is_task_waiting(task):
-        # failed tasks are waiting to be executed (for retry failing)
-        return task.is_waiting()
-
-    @staticmethod
-    def _has_task_ended(task):
-        return task.has_ended()
-
-    @staticmethod
-    def _is_task_waiting(task):
-        return task.is_waiting()
-
     @property
     def all_tasks_consumed(self):
         return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks)
== 0
@@ -173,7 +151,7 @@ class _TasksTracker(object):
     @property
     def ended_tasks(self):
         for task in self.executing_tasks:
-            if self._has_task_ended(task):
+            if task.has_ended():
                 yield task
 
     @property
@@ -181,7 +159,7 @@ class _TasksTracker(object):
         now = datetime.utcnow()
         # we need both lists since retrying task are in the executing task list.
         for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)):
-            if all([self._is_task_waiting(task),
+            if all([task.is_waiting(),
                     task.due_at <= now,
                     all(dependency in self._executed_tasks for dependency in task.dependencies)
                    ]):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/684b38fe/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index eb6f271..8a6c03d 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -123,11 +123,18 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
 
 
 @events.on_resume_workflow_signal.connect
-def _workflow_resume(workflow_context, *args, **kwargs):
+def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs):
     with workflow_context.persist_changes:
         execution = workflow_context.execution
         execution.status = execution.PENDING
 
+        if retry_failed:
+            for task in execution.tasks:
+                if task.status == task.FAILED and not task.ignore_failure:
+                    task.attempts_count = 0
+                    task.status = task.PENDING
+
+
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):


Mime
View raw message