Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D7F87200CC5 for ; Tue, 11 Jul 2017 17:50:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CE72E16636B; Tue, 11 Jul 2017 15:50:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 76CA0166364 for ; Tue, 11 Jul 2017 17:50:38 +0200 (CEST) Received: (qmail 18339 invoked by uid 500); 11 Jul 2017 15:50:37 -0000 Mailing-List: contact commits-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list commits@ariatosca.incubator.apache.org Received: (qmail 18330 invoked by uid 99); 11 Jul 2017 15:50:37 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd3-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jul 2017 15:50:37 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd3-us-west.apache.org (ASF Mail Server at spamd3-us-west.apache.org) with ESMTP id 26410195836 for ; Tue, 11 Jul 2017 15:50:37 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd3-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.22 X-Spam-Level: X-Spam-Status: No, score=-4.22 tagged_above=-999 required=6.31 tests=[HK_RANDOM_FROM=0.001, KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd3-us-west.apache.org [10.40.0.10]) (amavisd-new, port 10024) with ESMTP id K-7QoYgzemaM for ; Tue, 11 Jul 2017 15:50:33 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id E729F623C6 for ; Tue, 11 Jul 2017 15:50:31 +0000 (UTC) Received: (qmail 18296 invoked by uid 99); 11 Jul 2017 15:50:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 11 Jul 2017 15:50:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 07260E5E44; Tue, 11 Jul 2017 15:50:31 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxmrlv@apache.org To: commits@ariatosca.incubator.apache.org Message-Id: <6bd4c8baf0b8472fbf392a4977a28c73@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ariatosca git commit: ARIA-237 Support for resuming failed workflow executions [Forced Update!] Date: Tue, 11 Jul 2017 15:50:31 +0000 (UTC) archived-at: Tue, 11 Jul 2017 15:50:40 -0000 Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions 07df95f47 -> 1b1e98d39 (forced update) ARIA-237 Support for resuming failed workflow executions Support for resuming failed workflow. It is now possible to rerun failed tasks. Additional changes: * When a task succeeds, the attempt_counter is moved forward. * Fixed an issue with the cli usage of resumable workflows. Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/1b1e98d3 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/1b1e98d3 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/1b1e98d3 Branch: refs/heads/ARIA-237-Support-for-resuming-failed-workflow-executions Commit: 1b1e98d391e808e5cfd36e033d8cbf5442d20410 Parents: 4c789f9 Author: max-orlov Authored: Sun Jul 2 21:43:43 2017 +0300 Committer: max-orlov Committed: Tue Jul 11 18:49:50 2017 +0300 ---------------------------------------------------------------------- aria/cli/commands/executions.py | 10 +- aria/cli/core/aria.py | 6 + aria/cli/helptexts.py | 1 + aria/modeling/orchestration.py | 4 +- aria/orchestrator/workflow_runner.py | 8 +- aria/orchestrator/workflows/core/engine.py | 9 +- .../workflows/core/events_handler.py | 10 +- tests/modeling/test_models.py | 3 +- tests/orchestrator/test_workflow_runner.py | 198 +++++++++++++++---- 9 files changed, 196 insertions(+), 53 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b1e98d3/aria/cli/commands/executions.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py index ea70af5..4783442 100644 --- a/aria/cli/commands/executions.py +++ b/aria/cli/commands/executions.py @@ -157,10 +157,8 @@ def start(workflow_name, @executions.command(name='resume', short_help='Resume a stopped execution') @aria.argument('execution-id') -@aria.options.inputs(help=helptexts.EXECUTION_INPUTS) @aria.options.dry_execution -@aria.options.task_max_attempts() -@aria.options.task_retry_interval() +@aria.options.retry_failed_tasks @aria.options.mark_pattern() @aria.options.verbose() @aria.pass_model_storage @@ -168,9 +166,8 @@ def start(workflow_name, @aria.pass_plugin_manager @aria.pass_logger def resume(execution_id, + retry_failed_tasks, dry, - task_max_attempts, - task_retry_interval, mark_pattern, model_storage, resource_storage, @@ -194,8 +191,7 @@ def resume(execution_id, workflow_runner = \ WorkflowRunner( model_storage, resource_storage, plugin_manager, - execution_id=execution_id, executor=executor, - task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval + execution_id=execution_id, retry_failed_tasks=retry_failed_tasks, executor=executor, ) logger.info('Resuming {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else '')) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b1e98d3/aria/cli/core/aria.py ---------------------------------------------------------------------- diff --git a/aria/cli/core/aria.py b/aria/cli/core/aria.py index 515c06a..b84507c 100644 --- a/aria/cli/core/aria.py +++ b/aria/cli/core/aria.py @@ -310,6 +310,12 @@ class Options(object): is_flag=True, help=helptexts.DRY_EXECUTION) + self.retry_failed_tasks = click.option( + '--retry-failed-tasks', + is_flag=True, + help=helptexts.RETRY_FAILED_TASK + ) + self.reset_config = click.option( '--reset-config', is_flag=True, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b1e98d3/aria/cli/helptexts.py ---------------------------------------------------------------------- diff --git a/aria/cli/helptexts.py b/aria/cli/helptexts.py index a5d41e8..5ab353a 100644 --- a/aria/cli/helptexts.py +++ b/aria/cli/helptexts.py @@ -46,6 +46,7 @@ TASK_MAX_ATTEMPTS = \ "How many times should a task be attempted in case of failures [default: {0}]" DRY_EXECUTION = "Execute a workflow dry run (prints operations information without causing side " \ "effects)" +RETRY_FAILED_TASK = "Retry tasks that failed in the previous execution attempt" IGNORE_AVAILABLE_NODES = "Delete the service even if it has available nodes" SORT_BY = "Key for sorting the list" DESCENDING = "Sort list in descending order [default: False]" http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b1e98d3/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index df2643e..4d4f0fe 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -65,7 +65,9 @@ class ExecutionBase(mixins.ModelMixin): PENDING: (STARTED, CANCELLED), STARTED: END_STATES + (CANCELLING,), CANCELLING: END_STATES, - CANCELLED: PENDING + # Retrying + CANCELLED: PENDING, + FAILED: PENDING } # region one_to_many relationships http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b1e98d3/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 47270c0..a85e7d3 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -38,7 +38,8 @@ DEFAULT_TASK_RETRY_INTERVAL = 30 class WorkflowRunner(object): def __init__(self, model_storage, resource_storage, plugin_manager, - execution_id=None, service_id=None, workflow_name=None, inputs=None, executor=None, + execution_id=None, retry_failed_tasks=False, + service_id=None, workflow_name=None, inputs=None, executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS, task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL): """ @@ -62,6 +63,7 @@ class WorkflowRunner(object): "and service id with inputs") self._is_resume = execution_id is not None + self._retry_failed_tasks = retry_failed_tasks self._model_storage = model_storage self._resource_storage = resource_storage @@ -116,7 +118,9 @@ class WorkflowRunner(object): return self._model_storage.service.get(self._service_id) def execute(self): - self._engine.execute(ctx=self._workflow_context, resuming=self._is_resume) + self._engine.execute(ctx=self._workflow_context, + resuming=self._is_resume, + retry_failed=self._retry_failed_tasks) def cancel(self): self._engine.cancel_execution(ctx=self._workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b1e98d3/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index d9c77e9..0ec3cd8 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -41,14 +41,15 @@ class Engine(logger.LoggerMixin): self._executors = executors.copy() self._executors.setdefault(StubTaskExecutor, StubTaskExecutor()) - def execute(self, ctx, resuming=False): + def execute(self, ctx, resuming=False, retry_failed=False): """ Executes the workflow. """ if resuming: - events.on_resume_workflow_signal.send(ctx) + events.on_resume_workflow_signal.send(ctx, retry_failed=retry_failed) tasks_tracker = _TasksTracker(ctx) + try: events.start_workflow_signal.send(ctx) while True: @@ -124,8 +125,10 @@ class Engine(logger.LoggerMixin): class _TasksTracker(object): + def __init__(self, ctx): self._ctx = ctx + self._tasks = ctx.execution.tasks self._executed_tasks = [task for task in self._tasks if task.has_ended()] self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) @@ -155,7 +158,7 @@ class _TasksTracker(object): def executable_tasks(self): now = datetime.utcnow() # we need both lists since retrying task are in the executing task list. - for task in self._update_tasks(self._executing_tasks + self._executable_tasks): + for task in self._update_tasks(set(self._executing_tasks + self._executable_tasks)): if all([task.is_waiting(), task.due_at <= now, all(dependency in self._executed_tasks for dependency in task.dependencies) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b1e98d3/aria/orchestrator/workflows/core/events_handler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py index 37801de..219d2df 100644 --- a/aria/orchestrator/workflows/core/events_handler.py +++ b/aria/orchestrator/workflows/core/events_handler.py @@ -71,6 +71,7 @@ def _task_succeeded(ctx, *args, **kwargs): with ctx.persist_changes: ctx.task.ended_at = datetime.utcnow() ctx.task.status = ctx.task.SUCCESS + ctx.task.attempts_count += 1 _update_node_state_if_necessary(ctx) @@ -119,7 +120,7 @@ def _workflow_cancelled(workflow_context, *args, **kwargs): @events.on_resume_workflow_signal.connect -def _workflow_resume(workflow_context, *args, **kwargs): +def _workflow_resume(workflow_context, retry_failed=False, *args, **kwargs): with workflow_context.persist_changes: execution = workflow_context.execution execution.status = execution.PENDING @@ -128,6 +129,13 @@ def _workflow_resume(workflow_context, *args, **kwargs): if not task.has_ended(): task.status = task.PENDING + if retry_failed: + for task in execution.tasks: + if task.status == task.FAILED and not task.ignore_failure: + task.attempts_count = 0 + task.status = task.PENDING + + @events.on_cancelling_workflow_signal.connect def _workflow_cancelling(workflow_context, *args, **kwargs): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b1e98d3/tests/modeling/test_models.py ---------------------------------------------------------------------- diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py index e1167fc..25b4080 100644 --- a/tests/modeling/test_models.py +++ b/tests/modeling/test_models.py @@ -324,8 +324,7 @@ class TestExecution(object): Execution.STARTED: [Execution.PENDING], Execution.CANCELLING: [Execution.PENDING, Execution.STARTED], - Execution.FAILED: [Execution.PENDING, - Execution.STARTED, + Execution.FAILED: [Execution.STARTED, Execution.SUCCEEDED, Execution.CANCELLED, Execution.CANCELLING], http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/1b1e98d3/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index a77d727..30176ae 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -51,7 +51,7 @@ custom_events = { 'is_resumed': Event(), 'is_active': Event(), 'execution_cancelled': Event(), - 'execution_ended': Event() + 'execution_failed': Event(), } @@ -166,7 +166,8 @@ def test_execute(request, service): assert engine_kwargs['ctx'].execution.workflow_name == 'test_workflow' mock_engine_execute.assert_called_once_with(ctx=workflow_runner._workflow_context, - resuming=False) + resuming=False, + retry_failed=False) def test_cancel_execution(request): @@ -358,10 +359,11 @@ class TestResumableWorkflows(object): def test_resume_workflow(self, workflow_context, thread_executor): node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) - self._create_interface(workflow_context, node, mock_resuming_task) + self._create_interface(workflow_context, node, mock_pass_first_task_only) wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_parallel_workflow, thread_executor) + workflow_context, mock_parallel_tasks_workflow, thread_executor, + inputs={'number_of_tasks': 2}) wf_thread = Thread(target=wf_runner.execute) wf_thread.daemon = True @@ -369,6 +371,7 @@ class TestResumableWorkflows(object): # Wait for the execution to start self._wait_for_active_and_cancel(wf_runner) + node = workflow_context.model.node.refresh(node) tasks = workflow_context.model.task.list(filters={'_stub_type': None}) assert any(task.status == task.SUCCESS for task in tasks) @@ -390,6 +393,7 @@ class TestResumableWorkflows(object): new_wf_runner.execute() # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) assert all(task.status == task.SUCCESS for task in tasks) assert node.attributes['invocations'].value == 3 assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED @@ -400,13 +404,15 @@ class TestResumableWorkflows(object): self._create_interface(workflow_context, node, mock_stuck_task) wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_single_task_workflow, thread_executor) + workflow_context, mock_parallel_tasks_workflow, thread_executor, + inputs={'number_of_tasks': 1}) wf_thread = Thread(target=wf_runner.execute) wf_thread.daemon = True wf_thread.start() self._wait_for_active_and_cancel(wf_runner) + node = workflow_context.model.node.refresh(node) task = workflow_context.model.task.list(filters={'_stub_type': None})[0] assert node.attributes['invocations'].value == 1 assert task.status == task.STARTED @@ -430,6 +436,7 @@ class TestResumableWorkflows(object): new_thread_executor.close() # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) assert node.attributes['invocations'].value == 2 assert task.status == task.SUCCESS assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED @@ -439,13 +446,15 @@ class TestResumableWorkflows(object): node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) self._create_interface(workflow_context, node, mock_failed_before_resuming) - wf_runner = self._create_initial_workflow_runner( - workflow_context, mock_single_task_workflow, thread_executor) + wf_runner = self._create_initial_workflow_runner(workflow_context, + mock_parallel_tasks_workflow, + thread_executor) wf_thread = Thread(target=wf_runner.execute) wf_thread.setDaemon(True) wf_thread.start() self._wait_for_active_and_cancel(wf_runner) + node = workflow_context.model.node.refresh(node) task = workflow_context.model.task.list(filters={'_stub_type': None})[0] assert node.attributes['invocations'].value == 2 @@ -474,10 +483,114 @@ class TestResumableWorkflows(object): new_thread_executor.close() # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) assert node.attributes['invocations'].value == task.max_attempts - 1 assert task.status == task.SUCCESS assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + def test_resume_failed_task_and_successful_task(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_pass_first_task_only) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, + mock_parallel_tasks_workflow, + thread_executor, + inputs={'retry_interval': 1, 'max_attempts': 2, 'number_of_tasks': 2} + ) + wf_thread = Thread(target=wf_runner.execute) + wf_thread.setDaemon(True) + wf_thread.start() + + if custom_events['execution_failed'].wait(60) is False: + raise TimeoutError("Execution did not end") + + tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == 3 + failed_task = [t for t in tasks if t.status == t.FAILED][0] + + # First task passes + assert any(task.status == task.FAILED for task in tasks) + assert failed_task.attempts_count == 2 + # Second task fails + assert any(task.status == task.SUCCESS for task in tasks) + assert wf_runner.execution.status in wf_runner.execution.FAILED + + custom_events['is_resumed'].set() + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + retry_failed_tasks=True, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + + new_wf_runner.execute() + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) + assert failed_task.attempts_count == 1 + assert node.attributes['invocations'].value == 4 + assert all(task.status == task.SUCCESS for task in tasks) + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + + def test_two_sequential_task_first_task_failed(self, workflow_context, thread_executor): + node = workflow_context.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) + node.attributes['invocations'] = models.Attribute.wrap('invocations', 0) + self._create_interface(workflow_context, node, mock_fail_first_task_only) + + wf_runner = self._create_initial_workflow_runner( + workflow_context, + mock_sequential_tasks_workflow, + thread_executor, + inputs={'retry_interval': 1, 'max_attempts': 1, 'number_of_tasks': 2} + ) + wf_thread = Thread(target=wf_runner.execute) + wf_thread.setDaemon(True) + wf_thread.start() + + if custom_events['execution_failed'].wait(60) is False: + raise TimeoutError("Execution did not end") + + tasks = workflow_context.model.task.list(filters={'_stub_type': None}) + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == 1 + assert any(t.status == t.FAILED for t in tasks) + assert any(t.status == t.PENDING for t in tasks) + + custom_events['is_resumed'].set() + new_thread_executor = thread.ThreadExecutor() + try: + new_wf_runner = WorkflowRunner( + service_id=wf_runner.service.id, + inputs={}, + model_storage=workflow_context.model, + resource_storage=workflow_context.resource, + plugin_manager=None, + execution_id=wf_runner.execution.id, + executor=new_thread_executor) + + new_wf_runner.execute() + finally: + new_thread_executor.close() + + # Wait for it to finish and assert changes. + node = workflow_context.model.node.refresh(node) + assert node.attributes['invocations'].value == 2 + assert any(t.status == t.SUCCESS for t in tasks) + assert any(t.status == t.FAILED for t in tasks) + assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED + + + @staticmethod @pytest.fixture def thread_executor(): @@ -524,51 +637,42 @@ class TestResumableWorkflows(object): def execution_cancelled(*args, **kwargs): custom_events['execution_cancelled'].set() - def execution_ended(*args, **kwargs): - custom_events['execution_ended'].set() + def execution_failed(*args, **kwargs): + custom_events['execution_failed'].set() events.on_cancelled_workflow_signal.connect(execution_cancelled) - events.on_failure_workflow_signal.connect(execution_ended) + events.on_failure_workflow_signal.connect(execution_failed) yield events.on_cancelled_workflow_signal.disconnect(execution_cancelled) - events.on_failure_workflow_signal.disconnect(execution_ended) + events.on_failure_workflow_signal.disconnect(execution_failed) for event in custom_events.values(): event.clear() @workflow -def mock_parallel_workflow(ctx, graph): +def mock_sequential_tasks_workflow(ctx, graph, + retry_interval=1, max_attempts=10, number_of_tasks=1): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - graph.add_tasks( - api.task.OperationTask( - node, interface_name='aria.interfaces.lifecycle', operation_name='create'), - api.task.OperationTask( - node, interface_name='aria.interfaces.lifecycle', operation_name='create') - ) - - -@operation -def mock_resuming_task(ctx): - ctx.node.attributes['invocations'] += 1 - - if ctx.node.attributes['invocations'] != 1: - custom_events['is_active'].set() - if not custom_events['is_resumed'].isSet(): - # if resume was called, increase by one. o/w fail the execution - second task should - # fail as long it was not a part of resuming the workflow - raise FailingTask("wasn't resumed yet") + graph.sequence(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks)) @workflow -def mock_single_task_workflow(ctx, graph): +def mock_parallel_tasks_workflow(ctx, graph, + retry_interval=1, max_attempts=10, number_of_tasks=1): node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME) - graph.add_tasks( + graph.add_tasks(*_create_tasks(node, retry_interval, max_attempts, number_of_tasks)) + + +def _create_tasks(node, retry_interval, max_attempts, number_of_tasks): + return [ api.task.OperationTask(node, - interface_name='aria.interfaces.lifecycle', - operation_name='create', - retry_interval=1, - max_attempts=10), - ) + 'aria.interfaces.lifecycle', + 'create', + retry_interval=retry_interval, + max_attempts=max_attempts) + for _ in xrange(number_of_tasks) + ] + @operation @@ -600,3 +704,23 @@ def mock_stuck_task(ctx): if not custom_events['is_active'].isSet(): custom_events['is_active'].set() time.sleep(5) + + +@operation +def mock_pass_first_task_only(ctx): + ctx.node.attributes['invocations'] += 1 + + if ctx.node.attributes['invocations'] != 1: + custom_events['is_active'].set() + if not custom_events['is_resumed'].isSet(): + # if resume was called, increase by one. o/w fail the execution - second task should + # fail as long it was not a part of resuming the workflow + raise FailingTask("wasn't resumed yet") + + +@operation +def mock_fail_first_task_only(ctx): + ctx.node.attributes['invocations'] += 1 + + if not custom_events['is_resumed'].isSet() and ctx.node.attributes['invocations'] == 1: + raise FailingTask("First task should fail")