ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: wip
Date Tue, 20 Jun 2017 16:27:24 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-236-Resumable-workflow-executions 1fee85c41 -> 8fdea04ff


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/8fdea04f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/8fdea04f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/8fdea04f

Branch: refs/heads/ARIA-236-Resumable-workflow-executions
Commit: 8fdea04ff2ba38855d415642d245a020ff4305de
Parents: 1fee85c
Author: max-orlov <maxim@gigaspaces.com>
Authored: Mon Jun 19 17:44:45 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Tue Jun 20 19:27:20 2017 +0300

----------------------------------------------------------------------
 aria/cli/commands/executions.py                 |  55 ++++++-
 aria/cli/logger.py                              |   4 +-
 aria/modeling/orchestration.py                  |   3 +-
 aria/orchestrator/context/workflow.py           |   5 +
 aria/orchestrator/events.py                     |   1 +
 aria/orchestrator/exceptions.py                 |   7 +
 aria/orchestrator/workflow_runner.py            |  41 +++--
 aria/orchestrator/workflows/core/engine.py      |   4 +
 .../workflows/core/events_handler.py            |   7 +
 tests/mock/__init__.py                          |   2 +-
 tests/mock/models.py                            |  14 +-
 tests/modeling/test_models.py                   |   5 +-
 tests/orchestrator/test_workflow_runner.py      | 157 +++++++++++++++++--
 13 files changed, 262 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/aria/cli/commands/executions.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/executions.py b/aria/cli/commands/executions.py
index 6176ea2..f2f8221 100644
--- a/aria/cli/commands/executions.py
+++ b/aria/cli/commands/executions.py
@@ -134,18 +134,63 @@ def start(workflow_name,
     executor = DryExecutor() if dry else None  # use WorkflowRunner's default executor
 
     workflow_runner = \
-        WorkflowRunner(workflow_name, service.id, inputs,
-                       model_storage, resource_storage, plugin_manager,
-                       executor, task_max_attempts, task_retry_interval)
+        WorkflowRunner(
+            inputs, model_storage, resource_storage, plugin_manager,
+            service_id=service.id, workflow_name=workflow_name, executor=executor,
+            task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
+        )
 
-    execution_thread_name = '{0}_{1}'.format(service_name, workflow_name)
+    _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+
+
+@executions.command(name='resume',
+                    short_help='Resume a workflow')
+@aria.argument('execution-id')
+@aria.options.inputs(help=helptexts.EXECUTION_INPUTS)
+@aria.options.dry_execution
+@aria.options.task_max_attempts()
+@aria.options.task_retry_interval()
+@aria.options.mark_pattern()
+@aria.options.verbose()
+@aria.pass_model_storage
+@aria.pass_resource_storage
+@aria.pass_plugin_manager
+@aria.pass_logger
+def resume(execution_id,
+           inputs,
+           dry,
+           task_max_attempts,
+           task_retry_interval,
+           mark_pattern,
+           model_storage,
+           resource_storage,
+           plugin_manager,
+           logger):
+    executor = DryExecutor() if dry else None  # use WorkflowRunner's default executor
+
+    workflow_runner = \
+        WorkflowRunner(
+            inputs, model_storage, resource_storage, plugin_manager,
+            execution_id=execution_id, executor=executor,
+            task_max_attempts=task_max_attempts, task_retry_interval=task_retry_interval
+        )
+
+    _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern)
+
+
+def _run_execution(workflow_runner, logger, model_storage, dry, mark_pattern):
+    execution_thread_name = '{0}_{1}'.format(workflow_runner.service.name,
+                                             workflow_runner.execution.workflow_name)
     execution_thread = threading.ExceptionThread(target=workflow_runner.execute,
                                                  name=execution_thread_name)
 
     logger.info('Starting {0}execution. Press Ctrl+C cancel'.format('dry ' if dry else ''))
     execution_thread.start()
 
-    log_iterator = cli_logger.ModelLogIterator(model_storage, workflow_runner.execution_id)
+    last_task_id = workflow_runner.execution.logs[-1].id if workflow_runner.execution.logs
else 0
+    log_iterator = cli_logger.ModelLogIterator(model_storage,
+                                               workflow_runner.execution_id,
+                                               offset=last_task_id)
     try:
         while execution_thread.is_alive():
             execution_logging.log_list(log_iterator, mark_pattern=mark_pattern)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/aria/cli/logger.py
----------------------------------------------------------------------
diff --git a/aria/cli/logger.py b/aria/cli/logger.py
index 5de3701..96f3fb3 100644
--- a/aria/cli/logger.py
+++ b/aria/cli/logger.py
@@ -115,8 +115,8 @@ class Logging(object):
 
 class ModelLogIterator(object):
 
-    def __init__(self, model_storage, execution_id, filters=None, sort=None):
-        self._last_visited_id = 0
+    def __init__(self, model_storage, execution_id, filters=None, sort=None, offset=0):
+        self._last_visited_id = offset
         self._model_storage = model_storage
         self._execution_id = execution_id
         self._additional_filters = filters or {}

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 17d2476..276b68e 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -68,7 +68,8 @@ class ExecutionBase(mixins.ModelMixin):
     VALID_TRANSITIONS = {
         PENDING: (STARTED, CANCELLED),
         STARTED: END_STATES + (CANCELLING,),
-        CANCELLING: END_STATES
+        CANCELLING: END_STATES,
+        CANCELLED: PENDING
     }
 
     @orm.validates('status')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index aa5a786..adcd635 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -97,10 +97,15 @@ class WorkflowContext(BaseContext):
 
     @property
     def _graph(self):
+        # Constructing a graph with only not ended nodes
         if self._execution_graph is None:
             graph = DiGraph()
             for task in self.execution.tasks:
+                if task.has_ended():
+                    continue
                 for dependency in task.dependencies:
+                    if dependency.has_ended():
+                        continue
                     graph.add_edge(dependency, task)
 
             self._execution_graph = graph

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index a1c4922..aa1b5bc 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -34,3 +34,4 @@ on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
 on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
 on_success_workflow_signal = signal('on_success_workflow_signal')
 on_failure_workflow_signal = signal('on_failure_workflow_signal')
+on_resume_workflow_signal = signal('on_resume_workflow_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index 8d3dcc6..71b6401 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -74,3 +74,10 @@ class WorkflowImplementationNotFoundError(AriaError):
     Raised when attempting to import a workflow's code but the implementation is not found
     """
     pass
+
+
+class InvalidWorkflowRunnerParams(AriaError):
+    """
+    Raised when invalid combination of arguments is passed to the workflow runner
+    """
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index 9e6b3ad..6e32ee1 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -37,9 +37,9 @@ DEFAULT_TASK_RETRY_INTERVAL = 30
 
 class WorkflowRunner(object):
 
-    def __init__(self, workflow_name, service_id, inputs,
-                 model_storage, resource_storage, plugin_manager,
-                 executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+    def __init__(self, inputs, model_storage, resource_storage, plugin_manager,
+                 execution_id=None, service_id=None, workflow_name=None, executor=None,
+                 task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
                  task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
         """
         Manages a single workflow execution on a given service.
@@ -57,26 +57,29 @@ class WorkflowRunner(object):
 
         self._model_storage = model_storage
         self._resource_storage = resource_storage
-        self._workflow_name = workflow_name
 
         # the IDs are stored rather than the models themselves, so this module could be used
         # by several threads without raising errors on model objects shared between threads
-        self._service_id = service_id
 
-        self._validate_workflow_exists_for_service()
-
-        workflow_fn = self._get_workflow_fn()
-
-        execution = self._create_execution_model(inputs)
-        self._execution_id = execution.id
+        if workflow_name is not None and service_id is not None and execution_id is None:
+            self._service_id = service_id
+            self._workflow_name = workflow_name
+            self._validate_workflow_exists_for_service()
+            self._execution_id = self._create_execution_model(inputs).id
+        elif execution_id is not None:
+            self._execution_id = execution_id
+            self._service_id = self.execution.service.id
+            self._workflow_name = model_storage.execution.get(self._execution_id).workflow_name
+        else:
+            raise exceptions.InvalidWorkflowRunnerParams("")
 
         self._workflow_context = WorkflowContext(
             name=self.__class__.__name__,
             model_storage=self._model_storage,
             resource_storage=resource_storage,
             service_id=service_id,
-            execution_id=execution.id,
-            workflow_name=workflow_name,
+            execution_id=self._execution_id,
+            workflow_name=self._workflow_name,
             task_max_attempts=task_max_attempts,
             task_retry_interval=task_retry_interval)
 
@@ -86,12 +89,18 @@ class WorkflowRunner(object):
         # transforming the execution inputs to dict, to pass them to the workflow function
         execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
 
-        self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
-        compile.create_execution_tasks(
-            self._workflow_context, self._tasks_graph, executor.__class__)
+        if execution_id is None:
+            # Not an existing execution
+            workflow_fn = self._get_workflow_fn()
+            tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
+            compile.create_execution_tasks(
+                self._workflow_context, self._tasks_graph, executor.__class__)
 
         self._engine = engine.Engine(executors={executor.__class__: executor})
 
+        if workflow_name is None:
+            self._engine.resume_execution(self._workflow_context)
+
     @property
     def execution_id(self):
         return self._execution_id

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 9f0ddd7..7f14500 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -69,6 +69,10 @@ class Engine(logger.LoggerMixin):
             raise
 
     @staticmethod
+    def resume_execution(ctx):
+        events.on_resume_workflow_signal.send(ctx)
+
+    @staticmethod
     def cancel_execution(ctx):
         """
         Send a cancel request to the engine. If execution already started, execution status

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 2d71d2a..e67963f 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -121,6 +121,13 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
             execution.ended_at = datetime.utcnow()
 
 
+@events.on_resume_workflow_signal.connect
+def _workflow_resume(workflow_context, *args, **kwargs):
+    with workflow_context.track_changes:
+        execution = workflow_context.execution
+        execution.status = execution.PENDING
+
+
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):
     with workflow_context.persist_changes:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/tests/mock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py
index 9004b4c..9183b77 100644
--- a/tests/mock/__init__.py
+++ b/tests/mock/__init__.py
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from . import models, context, topology, operations
+from . import models, context, topology, operations, workflow

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index 7f6bbea..23a14bd 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -225,20 +225,24 @@ def create_interface_template(service_template, interface_name, operation_name,
     )
 
 
-def create_interface(service, interface_name, operation_name, operation_kwargs=None,
-                     interface_kwargs=None):
-    the_type = service.service_template.interface_types.get_descendant('test_interface_type')
-
+def create_operation(operation_name, operation_kwargs=None):
     if operation_kwargs and operation_kwargs.get('arguments'):
         operation_kwargs['arguments'] = dict(
             (argument_name, models.Argument.wrap(argument_name, argument_value))
             for argument_name, argument_value in operation_kwargs['arguments'].iteritems()
             if argument_value is not None)
 
-    operation = models.Operation(
+    return models.Operation(
         name=operation_name,
         **(operation_kwargs or {})
     )
+
+
+def create_interface(service, interface_name, operation_name, operation_kwargs=None,
+                     interface_kwargs=None):
+    the_type = service.service_template.interface_types.get_descendant('test_interface_type')
+    operation = create_operation(operation_name, operation_kwargs)
+
     return models.Interface(
         type=the_type,
         operations=_dictify(operation),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index 464f432..bbc7352 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -314,7 +314,7 @@ class TestExecution(object):
                                    Execution.CANCELLING],
             Execution.FAILED: [Execution.FAILED],
             Execution.SUCCEEDED: [Execution.SUCCEEDED],
-            Execution.CANCELLED: [Execution.CANCELLED]
+            Execution.CANCELLED: [Execution.CANCELLED, Execution.PENDING]
         }
 
         invalid_transitions = {
@@ -334,8 +334,7 @@ class TestExecution(object):
                                   Execution.FAILED,
                                   Execution.CANCELLED,
                                   Execution.CANCELLING],
-            Execution.CANCELLED: [Execution.PENDING,
-                                  Execution.STARTED,
+            Execution.CANCELLED: [Execution.STARTED,
                                   Execution.FAILED,
                                   Execution.SUCCEEDED,
                                   Execution.CANCELLING],

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8fdea04f/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index 40f9035..3fa30f6 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -14,21 +14,30 @@
 # limitations under the License.
 
 import json
+from threading import Thread
 from datetime import datetime
 
-import pytest
 import mock
+import pytest
 
 from aria.modeling import exceptions as modeling_exceptions
 from aria.modeling import models
 from aria.orchestrator import exceptions
 from aria.orchestrator.workflow_runner import WorkflowRunner
 from aria.orchestrator.workflows.executor.process import ProcessExecutor
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor import thread
+from aria.orchestrator import (
+    workflow,
+    operation,
+)
 
-from ..mock import (
-    topology,
-    workflow as workflow_mocks
+from tests import (
+    mock as tests_mock,
+    storage
 )
+
 from ..fixtures import (  # pylint: disable=unused-import
     plugins_dir,
     plugin_manager,
@@ -36,6 +45,8 @@ from ..fixtures import (  # pylint: disable=unused-import
     resource_storage as resource
 )
 
+global_test_holder = {}
+
 
 def test_undeclared_workflow(request):
     # validating a proper error is raised when the workflow is not declared in the service
@@ -59,8 +70,8 @@ def test_builtin_workflow_instantiation(request):
     # validates the workflow runner instantiates properly when provided with a builtin workflow
     # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
     workflow_runner = _create_workflow_runner(request, 'install')
-    tasks = list(workflow_runner._tasks_graph.tasks)
-    assert len(tasks) == 2  # expecting two WorkflowTasks
+    tasks = list(workflow_runner.execution.tasks)
+    assert len(tasks) == 18  # expecting 18 tasks for 2 node topology
 
 
 def test_custom_workflow_instantiation(request):
@@ -68,8 +79,8 @@ def test_custom_workflow_instantiation(request):
     # (expecting no errors to be raised on undeclared workflow or missing workflow implementation)
     mock_workflow = _setup_mock_workflow_in_service(request)
     workflow_runner = _create_workflow_runner(request, mock_workflow)
-    tasks = list(workflow_runner._tasks_graph.tasks)
-    assert len(tasks) == 0  # mock workflow creates no tasks
+    tasks = list(workflow_runner.execution.tasks)
+    assert len(tasks) == 2  # mock workflow creates only start workflow and end workflow
task
 
 
 def test_existing_active_executions(request, service, model):
@@ -240,7 +251,7 @@ def test_workflow_function_parameters(request, tmpdir):
 @pytest.fixture
 def service(model):
     # sets up a service in the storage
-    service_id = topology.create_simple_topology_two_nodes(model)
+    service_id = tests_mock.topology.create_simple_topology_two_nodes(model)
     service = model.service.get(service_id)
     return service
 
@@ -251,7 +262,7 @@ def _setup_mock_workflow_in_service(request, inputs=None):
     service = request.getfuncargvalue('service')
     resource = request.getfuncargvalue('resource')
 
-    source = workflow_mocks.__file__
+    source = tests_mock.workflow.__file__
     resource.service_template.upload(str(service.service_template.id), source)
     mock_workflow_name = 'test_workflow'
     arguments = {}
@@ -293,3 +304,129 @@ def _create_workflow_runner(request, workflow_name, inputs=None, executor=None,
         resource_storage=resource,
         plugin_manager=plugin_manager,
         **task_configuration_kwargs)
+
+
+class TestResumableWorkflows(object):
+
+    def test_resume_workflow(self, workflow_context, executor):
+        self._create_interface(workflow_context, mock_success_task)
+
+        service = workflow_context.service
+        service.workflows['custom_workflow'] = tests_mock.models.create_operation(
+            'custom_workflow',
+            operation_kwargs={'function': '{0}.{1}'.format(__name__, mock_workflow.__name__)}
+        )
+        workflow_context.model.service.update(service)
+
+        wf_runner = WorkflowRunner(
+            service_id=workflow_context.service.id,
+            inputs={},
+            model_storage=workflow_context.model,
+            resource_storage=workflow_context.resource,
+            plugin_manager=None,
+            workflow_name='custom_workflow',
+            executor=executor)
+        wf_thread = Thread(target=wf_runner.execute)
+        try:
+            wf_thread.start()
+
+            # Wait for the execution to start
+            while global_test_holder.get('state') != 'active':
+                pass
+            global_test_holder['state'] = 'terminated'
+            wf_runner.cancel()
+
+            # Make sure the execution was canceled and the task has not ended
+            while wf_runner.execution.status != workflow_context.execution.CANCELLED:
+                pass
+            task = workflow_context.model.task.list(filters={'stub_type': None})[0]
+            assert task.status in (task.FAILED, task.RETRYING)
+            assert global_test_holder['state'] == 'idle'
+
+            # Create a new workflow runner, with an existing execution id. This would cause
+            # the old execution to restart.
+            new_wf_runner = WorkflowRunner(
+                service_id=wf_runner.service.id,
+                inputs={},
+                model_storage=workflow_context.model,
+                resource_storage=workflow_context.resource,
+                plugin_manager=None,
+                execution_id=wf_runner.execution.id,
+                executor=executor)
+
+            # Set the resumed to True, for the execution to succeed.
+            global_test_holder['resumed'] = True
+            new_wf_runner.execute()
+
+            # Wait for it to finish and assert changes.
+            while global_test_holder.get('state') != 'ended':
+                pass
+            assert task.status == task.SUCCESS
+            assert wf_runner.execution.status == wf_runner.execution.SUCCEEDED
+
+        except:
+            global_test_holder['state'] = 'terminated'
+            wf_thread.join(0.5)
+            raise
+
+    @staticmethod
+    @pytest.fixture
+    def executor():
+        result = thread.ThreadExecutor()
+        try:
+            yield result
+        finally:
+            result.close()
+
+    @staticmethod
+    @pytest.fixture
+    def workflow_context(tmpdir):
+        workflow_context = tests_mock.context.simple(str(tmpdir))
+        yield workflow_context
+        storage.release_sqlite_storage(workflow_context.model)
+
+    @staticmethod
+    def _create_interface(ctx, func, arguments=None):
+        node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+        interface_name = 'aria.interfaces.lifecycle'
+        operation_kwargs = dict(function='{name}.{func.__name__}'.format(
+            name=__name__, func=func))
+        if arguments:
+            # the operation has to declare the arguments before those may be passed
+            operation_kwargs['arguments'] = arguments
+        operation_name = 'create'
+        interface = tests_mock.models.create_interface(node.service, interface_name, operation_name,
+                                                       operation_kwargs=operation_kwargs)
+        node.interfaces[interface.name] = interface
+        ctx.model.node.update(node)
+
+        return node, interface_name, operation_name
+
+    @staticmethod
+    def _engine(workflow_func, workflow_context, executor):
+        graph = workflow_func(ctx=workflow_context)
+        execution = workflow_context.execution
+        engine.construct_execution_tasks(execution, graph, executor.__class__)
+        workflow_context.execution = execution
+
+        return engine.Engine(default_executor=executor)
+
+
+@workflow
+def mock_workflow(ctx, graph):
+    node = ctx.model.node.get_by_name(tests_mock.models.DEPENDENCY_NODE_NAME)
+    graph.add_tasks(
+        api.task.OperationTask(
+            node, interface_name='aria.interfaces.lifecycle', operation_name='create')
+    )
+
+
+@operation
+def mock_success_task(**_):
+    global_test_holder['state'] = 'active'
+    while global_test_holder.get('state') != 'terminated':
+        if global_test_holder.get('resumed') is True:
+            global_test_holder['state'] = 'ended'
+            return
+    global_test_holder['state'] = 'idle'
+    raise Exception("The operation was terminated")


Mime
View raw message