Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A4D58200CC4 for ; Thu, 29 Jun 2017 02:07:00 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A397F160BF7; Thu, 29 Jun 2017 00:07:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 25D80160BFD for ; Thu, 29 Jun 2017 02:06:58 +0200 (CEST) Received: (qmail 16799 invoked by uid 500); 29 Jun 2017 00:06:58 -0000 Mailing-List: contact commits-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list commits@ariatosca.incubator.apache.org Received: (qmail 16789 invoked by uid 99); 29 Jun 2017 00:06:58 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Jun 2017 00:06:58 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E7F56C0620 for ; Thu, 29 Jun 2017 00:06:57 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id meP83bk1RkEf for ; Thu, 29 Jun 2017 00:06:51 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 950785FB43 for ; Thu, 29 Jun 2017 00:06:49 +0000 (UTC) Received: (qmail 15974 invoked by uid 99); 29 Jun 2017 00:06:48 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 29 Jun 2017 00:06:48 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A85B9DFAB0; Thu, 29 Jun 2017 00:06:48 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: emblemparade@apache.org To: commits@ariatosca.incubator.apache.org Date: Thu, 29 Jun 2017 00:06:48 -0000 Message-Id: <5441a2b2b81b4d3990027b685dcdf1e9@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [01/15] incubator-ariatosca git commit: ARIA-284 Cleanup and optimize the task execution [Forced Update!] archived-at: Thu, 29 Jun 2017 00:07:00 -0000 Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-286-sphinx-documentation 5be27b0c0 -> 3f09ecdeb (forced update) ARIA-284 Cleanup and optimize the task execution Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a75a3dea Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a75a3dea Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a75a3dea Branch: refs/heads/ARIA-286-sphinx-documentation Commit: a75a3dea06741b73b9949e920e28877633a8bc28 Parents: 75112ab Author: max-orlov Authored: Wed Jun 21 12:41:33 2017 +0300 Committer: max-orlov Committed: Sun Jun 25 14:33:23 2017 +0300 ---------------------------------------------------------------------- aria/modeling/orchestration.py | 2 - aria/orchestrator/context/workflow.py | 19 --- aria/orchestrator/workflow_runner.py | 7 +- aria/orchestrator/workflows/core/compile.py | 116 ------------------ aria/orchestrator/workflows/core/engine.py | 110 ++++++++++------- .../workflows/core/graph_compiler.py | 120 +++++++++++++++++++ tests/orchestrator/context/__init__.py | 4 +- tests/orchestrator/context/test_serialize.py | 4 +- .../orchestrator/execution_plugin/test_local.py | 4 +- tests/orchestrator/execution_plugin/test_ssh.py | 6 +- tests/orchestrator/test_workflow_runner.py | 4 +- .../orchestrator/workflows/core/test_engine.py | 4 +- .../orchestrator/workflows/core/test_events.py | 9 +- .../test_task_graph_into_execution_graph.py | 21 +++- .../executor/test_process_executor_extension.py | 4 +- .../test_process_executor_tracked_changes.py | 4 +- 16 files changed, 229 insertions(+), 209 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/aria/modeling/orchestration.py ---------------------------------------------------------------------- diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py index 276b68e..5b02d1b 100644 --- a/aria/modeling/orchestration.py +++ b/aria/modeling/orchestration.py @@ -306,7 +306,6 @@ class TaskBase(mixins.ModelMixin): ended_at = Column(DateTime, default=None) attempts_count = Column(Integer, default=1) - _api_id = Column(String) _executor = Column(PickleType) _context_cls = Column(PickleType) _stub_type = Column(Enum(*STUB_TYPES)) @@ -442,7 +441,6 @@ class TaskBase(mixins.ModelMixin): 'plugin': api_task.plugin, 'function': api_task.function, 'arguments': api_task.arguments, - '_api_id': api_task.id, '_context_cls': api_task._context_cls, '_executor': executor, } http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index adcd635..18334f3 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -20,8 +20,6 @@ Workflow and operation contexts import threading from contextlib import contextmanager -from networkx import DiGraph - from .exceptions import ContextException from .common import BaseContext @@ -96,23 +94,6 @@ class WorkflowContext(BaseContext): ) @property - def _graph(self): - # Constructing a graph with only not ended nodes - if self._execution_graph is None: - graph = DiGraph() - for task in self.execution.tasks: - if task.has_ended(): - continue - for dependency in task.dependencies: - if dependency.has_ended(): - continue - graph.add_edge(dependency, task) - - self._execution_graph = graph - - return self._execution_graph - - @property @contextmanager def persist_changes(self): yield http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/aria/orchestrator/workflow_runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py index 3ccb1ee..4a50fb2 100644 --- a/aria/orchestrator/workflow_runner.py +++ b/aria/orchestrator/workflow_runner.py @@ -24,7 +24,7 @@ from datetime import datetime from . import exceptions from .context.workflow import WorkflowContext from .workflows import builtin -from .workflows.core import engine, compile +from .workflows.core import engine, graph_compiler from .workflows.executor.process import ProcessExecutor from ..modeling import models from ..modeling import utils as modeling_utils @@ -96,8 +96,9 @@ class WorkflowRunner(object): if not self._is_resume: workflow_fn = self._get_workflow_fn() - tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) - compile.create_execution_tasks(self._workflow_context, tasks_graph, executor.__class__) + self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict) + graph_compiler.GraphCompiler(self._workflow_context, executor.__class__).compile( + self._tasks_graph) self._engine = engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/aria/orchestrator/workflows/core/compile.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/compile.py b/aria/orchestrator/workflows/core/compile.py deleted file mode 100644 index 932268a..0000000 --- a/aria/orchestrator/workflows/core/compile.py +++ /dev/null @@ -1,116 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -from ....modeling import models -from .. import executor, api - - -def create_execution_tasks(ctx, task_graph, default_executor): - execution = ctx.execution - _construct_execution_tasks(execution, task_graph, default_executor) - ctx.model.execution.update(execution) - return execution.tasks - - -def _construct_execution_tasks(execution, - task_graph, - default_executor, - stub_executor=executor.base.StubTaskExecutor, - start_stub_type=models.Task.START_WORKFLOW, - end_stub_type=models.Task.END_WORKFLOW, - depends_on=()): - """ - Translates the user graph to the execution graph - :param task_graph: The user's graph - :param start_stub_type: internal use - :param end_stub_type: internal use - :param depends_on: internal use - """ - depends_on = list(depends_on) - - # Insert start marker - start_task = models.Task(execution=execution, - dependencies=depends_on, - _api_id=_start_graph_suffix(task_graph.id), - _stub_type=start_stub_type, - _executor=stub_executor) - - for task in task_graph.topological_order(reverse=True): - operation_dependencies = _get_tasks_from_dependencies( - execution, task_graph.get_dependencies(task), [start_task]) - - if isinstance(task, api.task.OperationTask): - models.Task.from_api_task(api_task=task, - executor=default_executor, - dependencies=operation_dependencies) - - elif isinstance(task, api.task.WorkflowTask): - # Build the graph recursively while adding start and end markers - _construct_execution_tasks( - execution=execution, - task_graph=task, - default_executor=default_executor, - stub_executor=stub_executor, - start_stub_type=models.Task.START_SUBWROFKLOW, - end_stub_type=models.Task.END_SUBWORKFLOW, - depends_on=operation_dependencies - ) - elif isinstance(task, api.task.StubTask): - models.Task(execution=execution, - dependencies=operation_dependencies, - _api_id=task.id, - _executor=stub_executor, - _stub_type=models.Task.STUB, - ) - else: - raise RuntimeError('Undefined state') - - # Insert end marker - models.Task(dependencies=_get_non_dependent_tasks(execution) or [start_task], - execution=execution, - _api_id=_end_graph_suffix(task_graph.id), - _executor=stub_executor, - _stub_type=end_stub_type) - - -def _start_graph_suffix(api_id): - return '{0}-Start'.format(api_id) - - -def _end_graph_suffix(api_id): - return '{0}-End'.format(api_id) - - -def _get_non_dependent_tasks(execution): - tasks_with_dependencies = set() - for task in execution.tasks: - tasks_with_dependencies.update(task.dependencies) - return list(set(execution.tasks) - set(tasks_with_dependencies)) - - -def _get_tasks_from_dependencies(execution, dependencies, default=()): - """ - Returns task list from dependencies. - """ - tasks = [] - for dependency in dependencies: - if getattr(dependency, 'actor', False): - # This is - dependency_name = dependency.id - else: - dependency_name = _end_graph_suffix(dependency.id) - tasks.extend(task for task in execution.tasks if task._api_id == dependency_name) - return tasks or default http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index d5a6e70..d52ae85 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -45,22 +45,23 @@ class Engine(logger.LoggerMixin): """ execute the workflow """ - executing_tasks = [] - if resuming: events.on_resume_workflow_signal.send(ctx) + tasks_tracker = _TasksTracker(ctx) try: events.start_workflow_signal.send(ctx) while True: cancel = self._is_cancel(ctx) if cancel: break - for task in self._ended_tasks(ctx, executing_tasks): - self._handle_ended_tasks(ctx, task, executing_tasks) - for task in self._executable_tasks(ctx): - self._handle_executable_task(ctx, task, executing_tasks) - if self._all_tasks_consumed(ctx): + for task in tasks_tracker.ended_tasks: + self._handle_ended_tasks(task) + tasks_tracker.finished(task) + for task in tasks_tracker.executable_tasks: + tasks_tracker.executing(task) + self._handle_executable_task(ctx, task) + if tasks_tracker.all_tasks_consumed: break else: time.sleep(0.1) @@ -86,34 +87,7 @@ class Engine(logger.LoggerMixin): execution = ctx.model.execution.refresh(ctx.execution) return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED) - def _executable_tasks(self, ctx): - now = datetime.utcnow() - return ( - task for task in self._tasks_iter(ctx) - if task.is_waiting() and task.due_at <= now and \ - not self._task_has_dependencies(ctx, task) - ) - - @staticmethod - def _ended_tasks(ctx, executing_tasks): - for task in executing_tasks: - if task.has_ended() and task in ctx._graph: - yield task - - @staticmethod - def _task_has_dependencies(ctx, task): - return len(ctx._graph.pred.get(task, [])) > 0 - - @staticmethod - def _all_tasks_consumed(ctx): - return len(ctx._graph.node) == 0 - - @staticmethod - def _tasks_iter(ctx): - for task in ctx.execution.tasks: - yield ctx.model.task.refresh(task) - - def _handle_executable_task(self, ctx, task, executing_tasks): + def _handle_executable_task(self, ctx, task): task_executor = self._executors[task._executor] # If the task is a stub, a default context is provided, else it should hold the context cls @@ -129,16 +103,70 @@ class Engine(logger.LoggerMixin): name=task.name ) - executing_tasks.append(task) - if not task._stub_type: events.sent_task_signal.send(op_ctx) task_executor.execute(op_ctx) @staticmethod - def _handle_ended_tasks(ctx, task, executing_tasks): - executing_tasks.remove(task) + def _handle_ended_tasks(task): if task.status == models.Task.FAILED and not task.ignore_failure: raise exceptions.ExecutorException('Workflow failed') - else: - ctx._graph.remove_node(task) + + +class _TasksTracker(object): + def __init__(self, ctx): + self._ctx = ctx + self._tasks = ctx.execution.tasks + self._executed_tasks = [task for task in self._tasks if task.has_ended()] + self._executable_tasks = list(set(self._tasks) - set(self._executed_tasks)) + self._executing_tasks = [] + + @property + def all_tasks_consumed(self): + return len(self._executed_tasks) == len(self._tasks) and len(self._executing_tasks) == 0 + + def executing(self, task): + # Task executing could be retrying (thus removed and added earlier) + if task not in self._executing_tasks: + self._executable_tasks.remove(task) + self._executing_tasks.append(task) + + def finished(self, task): + self._executing_tasks.remove(task) + self._executed_tasks.append(task) + + @property + def ended_tasks(self): + for task in self.executing_tasks: + if task.has_ended(): + yield task + + @property + def executable_tasks(self): + now = datetime.utcnow() + # we need both lists since retrying task are in the executing task list. + for task in self._update_tasks(self._executing_tasks + self._executable_tasks): + if all([task.is_waiting(), + task.due_at <= now, + all(dependency in self._executed_tasks for dependency in task.dependencies) + ]): + yield task + + @property + def executing_tasks(self): + for task in self._update_tasks(self._executing_tasks): + yield task + + @property + def executed_tasks(self): + for task in self._update_tasks(self._executed_tasks): + yield task + + @property + def tasks(self): + for task in self._update_tasks(self._tasks): + yield task + + def _update_tasks(self, tasks): + for task in tasks: + yield self._ctx.model.task.refresh(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/aria/orchestrator/workflows/core/graph_compiler.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/graph_compiler.py b/aria/orchestrator/workflows/core/graph_compiler.py new file mode 100644 index 0000000..f339038 --- /dev/null +++ b/aria/orchestrator/workflows/core/graph_compiler.py @@ -0,0 +1,120 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +from ....modeling import models +from .. import executor, api + + +class GraphCompiler(object): + def __init__(self, ctx, default_executor): + self._ctx = ctx + self._default_executor = default_executor + self._stub_executor = executor.base.StubTaskExecutor + self._model_to_api_id = {} + + def compile(self, + task_graph, + start_stub_type=models.Task.START_WORKFLOW, + end_stub_type=models.Task.END_WORKFLOW, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param start_stub_type: internal use + :param end_stub_type: internal use + :param depends_on: internal use + """ + task_graph = task_graph or self._task_graph + depends_on = list(depends_on) + + # Insert start marker + start_task = self._create_stub_task( + start_stub_type, depends_on, self._start_graph_suffix(task_graph.id), task_graph.name, + ) + + for task in task_graph.topological_order(reverse=True): + dependencies = \ + (self._get_tasks_from_dependencies(task_graph.get_dependencies(task)) + or [start_task]) + + if isinstance(task, api.task.OperationTask): + self._create_operation_task(task, dependencies) + + elif isinstance(task, api.task.WorkflowTask): + # Build the graph recursively while adding start and end markers + self.compile( + task, models.Task.START_SUBWROFKLOW, models.Task.END_SUBWORKFLOW, dependencies + ) + elif isinstance(task, api.task.StubTask): + self._create_stub_task(models.Task.STUB, dependencies, task.id) + else: + raise RuntimeError('Undefined state') + + # Insert end marker + self._create_stub_task( + end_stub_type, + self._get_non_dependent_tasks(self._ctx.execution) or [start_task], + self._end_graph_suffix(task_graph.id), + task_graph.name + ) + + def _create_stub_task(self, stub_type, dependencies, api_id, name=None): + model_task = models.Task( + name=name, + dependencies=dependencies, + execution=self._ctx.execution, + _executor=self._stub_executor, + _stub_type=stub_type) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_id + return model_task + + def _create_operation_task(self, api_task, dependencies): + model_task = models.Task.from_api_task( + api_task, self._default_executor, dependencies=dependencies) + self._ctx.model.task.put(model_task) + self._model_to_api_id[model_task.id] = api_task.id + return model_task + + @staticmethod + def _start_graph_suffix(api_id): + return '{0}-Start'.format(api_id) + + @staticmethod + def _end_graph_suffix(api_id): + return '{0}-End'.format(api_id) + + @staticmethod + def _get_non_dependent_tasks(execution): + tasks_with_dependencies = set() + for task in execution.tasks: + tasks_with_dependencies.update(task.dependencies) + return list(set(execution.tasks) - set(tasks_with_dependencies)) + + def _get_tasks_from_dependencies(self, dependencies): + """ + Returns task list from dependencies. + """ + tasks = [] + for dependency in dependencies: + if getattr(dependency, 'actor', False): + # This is + dependency_name = dependency.id + else: + dependency_name = self._end_graph_suffix(dependency.id) + tasks.extend(task for task in self._ctx.execution.tasks + if self._model_to_api_id.get(task.id, None) == dependency_name) + return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/context/__init__.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py index 086a066..780db07 100644 --- a/tests/orchestrator/context/__init__.py +++ b/tests/orchestrator/context/__init__.py @@ -15,7 +15,7 @@ import sys -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler def op_path(func, module_path=None): @@ -26,7 +26,7 @@ def op_path(func, module_path=None): def execute(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - compile.create_execution_tasks(workflow_context, graph, executor.__class__) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) eng = engine.Engine(executors={executor.__class__: executor}) eng.execute(workflow_context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 5db5b63..6046a16 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -16,7 +16,7 @@ import pytest from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation import tests @@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir): context.model.node.update(node) graph = _mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks(context, graph, executor.__class__) + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 1695320..5b94917 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -28,7 +28,7 @@ from aria.orchestrator.execution_plugin.exceptions import ProcessException from aria.orchestrator.execution_plugin import local from aria.orchestrator.execution_plugin import constants from aria.orchestrator.workflows.executor import process -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from tests import mock from tests import storage @@ -500,7 +500,7 @@ if __name__ == '__main__': arguments=arguments)) return graph tasks_graph = mock_workflow(ctx=workflow_context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks(workflow_context, tasks_graph, executor.__class__) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(tasks_graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(workflow_context) return workflow_context.model.node.get_by_name( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index fb1dc09..4fa8184 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -29,7 +29,7 @@ from aria.orchestrator import events from aria.orchestrator import workflow from aria.orchestrator.workflows import api from aria.orchestrator.workflows.executor import process -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.exceptions import ExecutorException from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException from aria.orchestrator.execution_plugin import operations @@ -254,8 +254,8 @@ class TestWithActualSSHServer(object): graph.sequence(*ops) return graph tasks_graph = mock_workflow(ctx=self._workflow_context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks( - self._workflow_context, tasks_graph, self._executor.__class__) + graph_compiler.GraphCompiler( + self._workflow_context, self._executor.__class__).compile(tasks_graph) eng = engine.Engine({self._executor.__class__: self._executor}) eng.execute(self._workflow_context) return self._workflow_context.model.node.get_by_name( http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/test_workflow_runner.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py index ae82476..103596b 100644 --- a/tests/orchestrator/test_workflow_runner.py +++ b/tests/orchestrator/test_workflow_runner.py @@ -27,7 +27,7 @@ from aria.orchestrator.events import on_cancelled_workflow_signal from aria.orchestrator.workflow_runner import WorkflowRunner from aria.orchestrator.workflows.executor.process import ProcessExecutor from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import thread from aria.orchestrator import ( workflow, @@ -410,7 +410,7 @@ class TestResumableWorkflows(object): def _engine(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) execution = workflow_context.execution - compile.create_execution_tasks(execution, graph, executor.__class__) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) workflow_context.execution = execution return engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/workflows/core/test_engine.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py index b77d284..21a53d7 100644 --- a/tests/orchestrator/workflows/core/test_engine.py +++ b/tests/orchestrator/workflows/core/test_engine.py @@ -28,7 +28,7 @@ from aria.orchestrator.workflows import ( api, exceptions, ) -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import thread from tests import mock, storage @@ -50,7 +50,7 @@ class BaseTest(object): @staticmethod def _engine(workflow_func, workflow_context, executor): graph = workflow_func(ctx=workflow_context) - compile.create_execution_tasks(workflow_context, graph, executor.__class__) + graph_compiler.GraphCompiler(workflow_context, executor.__class__).compile(graph) return engine.Engine(executors={executor.__class__: executor}) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/workflows/core/test_events.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py index 2b82443..30cc8ee 100644 --- a/tests/orchestrator/workflows/core/test_events.py +++ b/tests/orchestrator/workflows/core/test_events.py @@ -16,7 +16,7 @@ import pytest from aria.orchestrator.decorators import operation, workflow -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor.thread import ThreadExecutor from aria.orchestrator.workflows import api from aria.modeling.service_instance import NodeBase @@ -113,10 +113,9 @@ def run_operation_on_node(ctx, op_name, interface_name): operation_name=op_name, operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func))) node.interfaces[interface.name] = interface - compile.create_execution_tasks( - ctx, - single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name), - ThreadExecutor) + graph_compiler.GraphCompiler(ctx, ThreadExecutor).compile( + single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name) + ) eng = engine.Engine(executors={ThreadExecutor: ThreadExecutor()}) eng.execute(ctx) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py index f5fb17a..f0d2b26 100644 --- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py +++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py @@ -13,12 +13,12 @@ # See the License for the specific language governing permissions and # limitations under the License. -from networkx import topological_sort +from networkx import topological_sort, DiGraph from aria.modeling import models from aria.orchestrator import context from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import compile +from aria.orchestrator.workflows.core import graph_compiler from aria.orchestrator.workflows.executor import base from tests import mock from tests import storage @@ -65,9 +65,10 @@ def test_task_graph_into_execution_graph(tmpdir): test_task_graph.add_dependency(inner_task_graph, simple_before_task) test_task_graph.add_dependency(simple_after_task, inner_task_graph) - compile.create_execution_tasks(workflow_context, test_task_graph, base.StubTaskExecutor) + compiler = graph_compiler.GraphCompiler(workflow_context, base.StubTaskExecutor) + compiler.compile(test_task_graph) - execution_tasks = topological_sort(workflow_context._graph) + execution_tasks = topological_sort(_graph(workflow_context.execution.tasks)) assert len(execution_tasks) == 7 @@ -81,7 +82,7 @@ def test_task_graph_into_execution_graph(tmpdir): '{0}-End'.format(test_task_graph.id) ] - assert expected_tasks_names == [t._api_id for t in execution_tasks] + assert expected_tasks_names == [compiler._model_to_api_id[t.id] for t in execution_tasks] assert all(isinstance(task, models.Task) for task in execution_tasks) execution_tasks = iter(execution_tasks) @@ -97,7 +98,6 @@ def test_task_graph_into_execution_graph(tmpdir): def _assert_execution_is_api_task(execution_task, api_task): - assert execution_task._api_id == api_task.id assert execution_task.name == api_task.name assert execution_task.function == api_task.function assert execution_task.actor == api_task.actor @@ -106,3 +106,12 @@ def _assert_execution_is_api_task(execution_task, api_task): def _get_task_by_name(task_name, graph): return graph.node[task_name]['task'] + + +def _graph(tasks): + graph = DiGraph() + for task in tasks: + for dependency in task.dependencies: + graph.add_edge(dependency, task) + + return graph http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/workflows/executor/test_process_executor_extension.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py index ba98c4f..6ed3e2b 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py @@ -17,7 +17,7 @@ import pytest from aria import extension from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation @@ -57,7 +57,7 @@ def test_decorate_extension(context, executor): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks(context, graph, executor.__class__) + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) out = get_node(context).attributes.get('out').value http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a75a3dea/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py index 2f1c325..a74a473 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py +++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py @@ -18,7 +18,7 @@ import copy import pytest from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.core import engine, compile +from aria.orchestrator.workflows.core import engine, graph_compiler from aria.orchestrator.workflows.executor import process from aria.orchestrator import workflow, operation from aria.orchestrator.workflows import exceptions @@ -107,7 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None): graph.add_tasks(task) return graph graph = mock_workflow(ctx=context) # pylint: disable=no-value-for-parameter - compile.create_execution_tasks(context, graph, executor.__class__) + graph_compiler.GraphCompiler(context, executor.__class__).compile(graph) eng = engine.Engine({executor.__class__: executor}) eng.execute(context) out = context.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME).attributes.get('out')