Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 036F5200BC0 for ; Tue, 15 Nov 2016 18:55:23 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 01FD8160B03; Tue, 15 Nov 2016 17:55:23 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id ADD23160AF2 for ; Tue, 15 Nov 2016 18:55:20 +0100 (CET) Received: (qmail 38494 invoked by uid 500); 15 Nov 2016 17:55:19 -0000 Mailing-List: contact dev-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list dev@ariatosca.incubator.apache.org Received: (qmail 38483 invoked by uid 99); 15 Nov 2016 17:55:19 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 17:55:19 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 4ACC4C7A56 for ; Tue, 15 Nov 2016 17:55:19 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.219 X-Spam-Level: X-Spam-Status: No, score=-5.219 tagged_above=-999 required=6.31 tests=[HK_RANDOM_FROM=0.999, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id it0ezTIGESFe for ; Tue, 15 Nov 2016 17:55:14 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id CC2985F4E7 for ; Tue, 15 Nov 2016 17:55:12 +0000 (UTC) Received: (qmail 38374 invoked by uid 99); 15 Nov 2016 17:55:12 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 15 Nov 2016 17:55:12 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 1AA4EED225; Tue, 15 Nov 2016 17:55:11 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxmrlv@apache.org To: dev@ariatosca.incubator.apache.org Date: Tue, 15 Nov 2016 17:55:13 -0000 Message-Id: In-Reply-To: <2398c77bf52745a793abfd609eda69b7@git.apache.org> References: <2398c77bf52745a793abfd609eda69b7@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [3/4] incubator-ariatosca git commit: repository_ordering archived-at: Tue, 15 Nov 2016 17:55:23 -0000 http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/api/task_graph.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task_graph.py b/aria/orchestrator/workflows/api/task_graph.py new file mode 100644 index 0000000..c88d343 --- /dev/null +++ b/aria/orchestrator/workflows/api/task_graph.py @@ -0,0 +1,290 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Task graph. Used by users to build workflows +""" + +from uuid import uuid4 +from collections import Iterable + +from networkx import DiGraph, topological_sort + +from . import task as api_task + + +class TaskNotInGraphError(Exception): + """ + An error representing a scenario where a given task is not in the graph as expected + """ + pass + + +def _filter_out_empty_tasks(func=None): + if func is None: + return lambda f: _filter_out_empty_tasks(func=f) + + def _wrapper(task, *tasks, **kwargs): + return func(*(t for t in [task] + list(tasks) if t), **kwargs) + return _wrapper + + +class TaskGraph(object): + """ + A tasks graph builder. + Build an operations flow graph + """ + + def __init__(self, name): + self.name = name + self._id = str(uuid4()) + self._graph = DiGraph() + + def __repr__(self): + return '{name}(id={self._id}, name={self.name}, graph={self._graph!r})'.format( + name=self.__class__.__name__, self=self) + + @property + def id(self): + """ + Represents the id of the graph + :return: graph id + """ + return self._id + + # graph traversal methods + + @property + def tasks(self): + """ + An iterator on tasks added to the graph + :yields: Iterator over all tasks in the graph + """ + for _, data in self._graph.nodes_iter(data=True): + yield data['task'] + + def topological_order(self, reverse=False): + """ + Returns topological sort on the graph + :param reverse: whether to reverse the sort + :return: a list which represents the topological sort + """ + for task_id in topological_sort(self._graph, reverse=reverse): + yield self.get_task(task_id) + + def get_dependencies(self, dependent_task): + """ + Iterates over the task's dependencies + :param BaseTask dependent_task: The task whose dependencies are requested + :yields: Iterator over all tasks which dependency_task depends on + :raise: TaskNotInGraphError if dependent_task is not in the graph + """ + if not self.has_tasks(dependent_task): + raise TaskNotInGraphError('Task id: {0}'.format(dependent_task.id)) + for _, dependency_id in self._graph.out_edges_iter(dependent_task.id): + yield self.get_task(dependency_id) + + def get_dependents(self, dependency_task): + """ + Iterates over the task's dependents + :param BaseTask dependency_task: The task whose dependents are requested + :yields: Iterator over all tasks which depend on dependency_task + :raise: TaskNotInGraphError if dependency_task is not in the graph + """ + if not self.has_tasks(dependency_task): + raise TaskNotInGraphError('Task id: {0}'.format(dependency_task.id)) + for dependent_id, _ in self._graph.in_edges_iter(dependency_task.id): + yield self.get_task(dependent_id) + + # task methods + + def get_task(self, task_id): + """ + Get a task instance that's been inserted to the graph by the task's id + :param basestring task_id: The task's id + :return: Requested task + :rtype: BaseTask + :raise: TaskNotInGraphError if no task found in the graph with the given id + """ + if not self._graph.has_node(task_id): + raise TaskNotInGraphError('Task id: {0}'.format(task_id)) + data = self._graph.node[task_id] + return data['task'] + + @_filter_out_empty_tasks + def add_tasks(self, *tasks): + """ + Add a task to the graph + :param BaseTask task: The task + :return: A list of added tasks + :rtype: list + """ + assert all([isinstance(task, (api_task.BaseTask, Iterable)) for task in tasks]) + return_tasks = [] + + for task in tasks: + if isinstance(task, Iterable): + return_tasks += self.add_tasks(*task) + elif not self.has_tasks(task): + self._graph.add_node(task.id, task=task) + return_tasks.append(task) + + return return_tasks + + @_filter_out_empty_tasks + def remove_tasks(self, *tasks): + """ + Remove the provided task from the graph + :param BaseTask task: The task + :return: A list of removed tasks + :rtype: list + """ + return_tasks = [] + + for task in tasks: + if isinstance(task, Iterable): + return_tasks += self.remove_tasks(*task) + elif self.has_tasks(task): + self._graph.remove_node(task.id) + return_tasks.append(task) + + return return_tasks + + @_filter_out_empty_tasks + def has_tasks(self, *tasks): + """ + Check whether a task is in the graph or not + :param BaseTask task: The task + :return: True if all tasks are in the graph, otherwise True + :rtype: list + """ + assert all(isinstance(t, (api_task.BaseTask, Iterable)) for t in tasks) + return_value = True + + for task in tasks: + if isinstance(task, Iterable): + return_value &= self.has_tasks(*task) + else: + return_value &= self._graph.has_node(task.id) + + return return_value + + def add_dependency(self, dependent, dependency): + """ + Add a dependency for one item (task, sequence or parallel) on another + The dependent will only be executed after the dependency terminates + If either of the items is either a sequence or a parallel, + multiple dependencies may be added + :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) + :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) + :return: True if the dependency between the two hadn't already existed, otherwise False + :rtype: bool + :raise TaskNotInGraphError if either the dependent or dependency are tasks which + are not in the graph + """ + if not (self.has_tasks(dependent) and self.has_tasks(dependency)): + raise TaskNotInGraphError() + + if self.has_dependency(dependent, dependency): + return + + if isinstance(dependent, Iterable): + for dependent_task in dependent: + self.add_dependency(dependent_task, dependency) + else: + if isinstance(dependency, Iterable): + for dependency_task in dependency: + self.add_dependency(dependent, dependency_task) + else: + self._graph.add_edge(dependent.id, dependency.id) + + def has_dependency(self, dependent, dependency): + """ + Check whether one item (task, sequence or parallel) depends on another + + Note that if either of the items is either a sequence or a parallel, + and some of the dependencies exist in the graph but not all of them, + this method will return False + + :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) + :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) + :return: True if the dependency between the two exists, otherwise False + :rtype: bool + :raise TaskNotInGraphError if either the dependent or dependency are tasks + which are not in the graph + """ + if not (dependent and dependency): + return False + elif not (self.has_tasks(dependent) and self.has_tasks(dependency)): + raise TaskNotInGraphError() + + return_value = True + + if isinstance(dependent, Iterable): + for dependent_task in dependent: + return_value &= self.has_dependency(dependent_task, dependency) + else: + if isinstance(dependency, Iterable): + for dependency_task in dependency: + return_value &= self.has_dependency(dependent, dependency_task) + else: + return_value &= self._graph.has_edge(dependent.id, dependency.id) + + return return_value + + def remove_dependency(self, dependent, dependency): + """ + Remove a dependency for one item (task, sequence or parallel) on another + + Note that if either of the items is either a sequence or a parallel, and some of + the dependencies exist in the graph but not all of them, this method will not remove + any of the dependencies and return False + + :param BaseTask|_TasksArrangement dependent: The dependent (task, sequence or parallel) + :param BaseTask|_TasksArrangement dependency: The dependency (task, sequence or parallel) + :return: False if the dependency between the two hadn't existed, otherwise True + :rtype: bool + :raise TaskNotInGraphError if either the dependent or dependency are tasks + which are not in the graph + """ + if not (self.has_tasks(dependent) and self.has_tasks(dependency)): + raise TaskNotInGraphError() + + if not self.has_dependency(dependent, dependency): + return + + if isinstance(dependent, Iterable): + for dependent_task in dependent: + self.remove_dependency(dependent_task, dependency) + elif isinstance(dependency, Iterable): + for dependency_task in dependency: + self.remove_dependency(dependent, dependency_task) + else: + self._graph.remove_edge(dependent.id, dependency.id) + + @_filter_out_empty_tasks + def sequence(self, *tasks): + """ + Create and insert a sequence into the graph, effectively each task i depends on i-1 + :param tasks: an iterable of dependencies + :return: the provided tasks + """ + if tasks: + self.add_tasks(*tasks) + + for i in xrange(1, len(tasks)): + self.add_dependency(tasks[i], tasks[i-1]) + + return tasks http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/builtin/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/__init__.py b/aria/orchestrator/workflows/builtin/__init__.py new file mode 100644 index 0000000..0449a8e --- /dev/null +++ b/aria/orchestrator/workflows/builtin/__init__.py @@ -0,0 +1,31 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +A set of builtin workflows +""" + +from .install import install +from .uninstall import uninstall +from .execute_operation import execute_operation +from .heal import heal + + +__all__ = [ + 'install', + 'uninstall', + 'execute_operation', + 'heal', +] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/builtin/execute_operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py new file mode 100644 index 0000000..ddbb8e7 --- /dev/null +++ b/aria/orchestrator/workflows/builtin/execute_operation.py @@ -0,0 +1,104 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Builtin execute_operation workflow +""" + +from aria import workflow + +from .workflows import execute_operation_on_instance + + +@workflow +def execute_operation( + ctx, + graph, + operation, + operation_kwargs, + allow_kwargs_override, + run_by_dependency_order, + type_names, + node_ids, + node_instance_ids, + **kwargs): + """ + The execute_operation workflow + + :param WorkflowContext workflow_context: the workflow context + :param TaskGraph graph: the graph which will describe the workflow. + :param basestring operation: the operation name to execute + :param dict operation_kwargs: + :param bool allow_kwargs_override: + :param bool run_by_dependency_order: + :param type_names: + :param node_ids: + :param node_instance_ids: + :param kwargs: + :return: + """ + subgraphs = {} + # filtering node instances + filtered_node_instances = list(_filter_node_instances( + context=ctx, + node_ids=node_ids, + node_instance_ids=node_instance_ids, + type_names=type_names)) + + if run_by_dependency_order: + filtered_node_instances_ids = set(node_instance.id + for node_instance in filtered_node_instances) + for node_instance in ctx.node_instances: + if node_instance.id not in filtered_node_instances_ids: + subgraphs[node_instance.id] = ctx.task_graph( + name='execute_operation_stub_{0}'.format(node_instance.id)) + + # registering actual tasks to sequences + for node_instance in filtered_node_instances: + graph.add_tasks( + execute_operation_on_instance( + node_instance=node_instance, + operation=operation, + operation_kwargs=operation_kwargs, + allow_kwargs_override=allow_kwargs_override + ) + ) + + for _, node_instance_sub_workflow in subgraphs.items(): + graph.add_tasks(node_instance_sub_workflow) + + # adding tasks dependencies if required + if run_by_dependency_order: + for node_instance in ctx.node_instances: + for relationship_instance in node_instance.relationship_instances: + graph.add_dependency(source_task=subgraphs[node_instance.id], + after=[subgraphs[relationship_instance.target_id]]) + + +def _filter_node_instances(context, node_ids=(), node_instance_ids=(), type_names=()): + def _is_node_by_id(node_id): + return not node_ids or node_id in node_ids + + def _is_node_instance_by_id(node_instance_id): + return not node_instance_ids or node_instance_id in node_instance_ids + + def _is_node_by_type(node_type_hierarchy): + return not type_names or node_type_hierarchy in type_names + + for node_instance in context.node_instances: + if all((_is_node_by_id(node_instance.node.id), + _is_node_instance_by_id(node_instance.id), + _is_node_by_type(node_instance.node.type_hierarchy))): + yield node_instance http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/builtin/heal.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/heal.py b/aria/orchestrator/workflows/builtin/heal.py new file mode 100644 index 0000000..dbfc14e --- /dev/null +++ b/aria/orchestrator/workflows/builtin/heal.py @@ -0,0 +1,174 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Builtin heal workflow +""" + +from aria import workflow + +from .workflows import relationship_tasks, install_node_instance, uninstall_node_instance +from ..api import task + + +@workflow +def heal(ctx, graph, node_instance_id): + """ + The heal workflow + + :param WorkflowContext ctx: the workflow context + :param TaskGraph graph: the graph which will describe the workflow. + :param node_instance_id: the id of the node instance to heal + :return: + """ + failing_node = ctx.model.node_instance.get(node_instance_id) + host_node = ctx.model.node_instance.get(failing_node.host_id) + failed_node_instance_subgraph = _get_contained_subgraph(ctx, host_node) + failed_node_instance_ids = list(n.id for n in failed_node_instance_subgraph) + + targeted_node_instances = [node_instance for node_instance in ctx.node_instances + if node_instance.id not in failed_node_instance_ids] + + uninstall_subgraph = task.WorkflowTask( + heal_uninstall, + failing_node_instances=failed_node_instance_subgraph, + targeted_node_instances=targeted_node_instances + ) + + install_subgraph = task.WorkflowTask( + heal_install, + failing_node_instances=failed_node_instance_subgraph, + targeted_node_instances=targeted_node_instances) + + graph.sequence(uninstall_subgraph, install_subgraph) + + +@workflow(suffix_template='{failing_node_instances}') +def heal_uninstall(ctx, graph, failing_node_instances, targeted_node_instances): + """ + the uninstall part of the heal mechanism + :param WorkflowContext ctx: the workflow context + :param TaskGraph graph: the task graph to edit. + :param failing_node_instances: the failing nodes to heal. + :param targeted_node_instances: the targets of the relationships where the failing node are + source + :return: + """ + node_instance_sub_workflows = {} + + # Create install stub workflow for each unaffected node instance + for node_instance in targeted_node_instances: + node_instance_stub = task.StubTask() + node_instance_sub_workflows[node_instance.id] = node_instance_stub + graph.add_tasks(node_instance_stub) + + # create install sub workflow for every node instance + for node_instance in failing_node_instances: + node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance, + node_instance=node_instance) + node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow + graph.add_tasks(node_instance_sub_workflow) + + # create dependencies between the node instance sub workflow + for node_instance in failing_node_instances: + node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] + for relationship_instance in reversed(node_instance.relationship_instances): + graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], + node_instance_sub_workflow) + + # Add operations for intact nodes depending on a node instance belonging to node_instances + for node_instance in targeted_node_instances: + node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] + + for relationship_instance in reversed(node_instance.relationship_instances): + target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) + target_node_instance_subgraph = node_instance_sub_workflows[target_node_instance.id] + graph.add_dependency(target_node_instance_subgraph, node_instance_sub_workflow) + + if target_node_instance in failing_node_instances: + dependency = relationship_tasks( + relationship_instance=relationship_instance, + operation_name='aria.interfaces.relationship_lifecycle.unlink') + graph.add_tasks(*dependency) + graph.add_dependency(node_instance_sub_workflow, dependency) + + +@workflow(suffix_template='{failing_node_instances}') +def heal_install(ctx, graph, failing_node_instances, targeted_node_instances): + """ + the install part of the heal mechanism + :param WorkflowContext ctx: the workflow context + :param TaskGraph graph: the task graph to edit. + :param failing_node_instances: the failing nodes to heal. + :param targeted_node_instances: the targets of the relationships where the failing node are + source + :return: + """ + node_instance_sub_workflows = {} + + # Create install sub workflow for each unaffected + for node_instance in targeted_node_instances: + node_instance_stub = task.StubTask() + node_instance_sub_workflows[node_instance.id] = node_instance_stub + graph.add_tasks(node_instance_stub) + + # create install sub workflow for every node instance + for node_instance in failing_node_instances: + node_instance_sub_workflow = task.WorkflowTask(install_node_instance, + node_instance=node_instance) + node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow + graph.add_tasks(node_instance_sub_workflow) + + # create dependencies between the node instance sub workflow + for node_instance in failing_node_instances: + node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] + if node_instance.relationship_instances: + dependencies = [node_instance_sub_workflows[relationship_instance.target_id] + for relationship_instance in node_instance.relationship_instances] + graph.add_dependency(node_instance_sub_workflow, dependencies) + + # Add operations for intact nodes depending on a node instance + # belonging to node_instances + for node_instance in targeted_node_instances: + node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] + + for relationship_instance in node_instance.relationship_instances: + target_node_instance = ctx.model.node_instance.get(relationship_instance.target_id) + target_node_instance_subworkflow = node_instance_sub_workflows[target_node_instance.id] + graph.add_dependency(node_instance_sub_workflow, target_node_instance_subworkflow) + + if target_node_instance in failing_node_instances: + dependent = relationship_tasks( + relationship_instance=relationship_instance, + operation_name='aria.interfaces.relationship_lifecycle.establish') + graph.add_tasks(*dependent) + graph.add_dependency(dependent, node_instance_sub_workflow) + + +def _get_contained_subgraph(context, host_node_instance): + contained_instances = [node_instance + for node_instance in context.node_instances + if node_instance.host_id == host_node_instance.id and + node_instance.id != node_instance.host_id] + result = [host_node_instance] + + if not contained_instances: + return result + + result.extend(contained_instances) + for node_instance in contained_instances: + result.extend(_get_contained_subgraph(context, node_instance)) + + return set(result) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/builtin/install.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/install.py b/aria/orchestrator/workflows/builtin/install.py new file mode 100644 index 0000000..0ab3ad6 --- /dev/null +++ b/aria/orchestrator/workflows/builtin/install.py @@ -0,0 +1,53 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Builtin install workflow +""" + +from aria import workflow + +from .workflows import install_node_instance +from ..api import task + + +@workflow +def install(ctx, graph, node_instances=(), node_instance_sub_workflows=None): + """ + The install workflow + :param WorkflowContext ctx: the workflow context + :param TaskGraph graph: the graph which will describe the workflow. + :param node_instances: the node instances on which to run the workflow + :param dict node_instance_sub_workflows: a dictionary of subworkflows with id as key and + TaskGraph (or OperationContext) as value + :return: + """ + node_instance_sub_workflows = node_instance_sub_workflows or {} + node_instances = node_instances or list(ctx.node_instances) + + # create install sub workflow for every node instance + for node_instance in node_instances: + node_instance_sub_workflow = task.WorkflowTask(install_node_instance, + node_instance=node_instance) + node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow + graph.add_tasks(node_instance_sub_workflow) + + # create dependencies between the node instance sub workflow + for node_instance in node_instances: + node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] + if node_instance.relationship_instances: + dependencies = [node_instance_sub_workflows[relationship_instance.target_id] + for relationship_instance in node_instance.relationship_instances] + graph.add_dependency(node_instance_sub_workflow, dependencies) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/builtin/uninstall.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/uninstall.py b/aria/orchestrator/workflows/builtin/uninstall.py new file mode 100644 index 0000000..f4e965c --- /dev/null +++ b/aria/orchestrator/workflows/builtin/uninstall.py @@ -0,0 +1,52 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Builtin uninstall workflow +""" + +from aria import workflow + +from .workflows import uninstall_node_instance +from ..api import task + + +@workflow +def uninstall(ctx, graph, node_instances=(), node_instance_sub_workflows=None): + """ + The uninstall workflow + :param WorkflowContext context: the workflow context + :param TaskGraph graph: the graph which will describe the workflow. + :param node_instances: the node instances on which to run the workflow + :param dict node_instance_sub_workflows: a dictionary of subworkflows with id as key and + TaskGraph (or OperationContext) as value + :return: + """ + node_instance_sub_workflows = node_instance_sub_workflows or {} + node_instances = node_instances or list(ctx.node_instances) + + # create install sub workflow for every node instance + for node_instance in node_instances: + node_instance_sub_workflow = task.WorkflowTask(uninstall_node_instance, + node_instance=node_instance) + node_instance_sub_workflows[node_instance.id] = node_instance_sub_workflow + graph.add_tasks(node_instance_sub_workflow) + + # create dependencies between the node instance sub workflow + for node_instance in node_instances: + node_instance_sub_workflow = node_instance_sub_workflows[node_instance.id] + for relationship_instance in reversed(node_instance.relationship_instances): + graph.add_dependency(node_instance_sub_workflows[relationship_instance.target_id], + node_instance_sub_workflow) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/builtin/workflows.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/builtin/workflows.py b/aria/orchestrator/workflows/builtin/workflows.py new file mode 100644 index 0000000..0eb8c34 --- /dev/null +++ b/aria/orchestrator/workflows/builtin/workflows.py @@ -0,0 +1,215 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +A set of builtin workflows. +""" + +from itertools import groupby + +from aria import workflow + +from ..api import task + + +__all__ = ( + 'install_node_instance', + 'uninstall_node_instance', + 'execute_operation_on_instance', +) + + +# Install node instance workflow and sub workflows + +@workflow(suffix_template='{node_instance.id}') +def install_node_instance(graph, node_instance, **kwargs): + """ + A workflow which installs a node instance. + :param WorkflowContext ctx: the workflow context + :param TaskGraph graph: the tasks graph of which to edit + :param node_instance: the node instance to install + :return: + """ + create_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.create') + + configure_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.configure') + start_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.start') + + graph.sequence( + create_node_instance, + preconfigure_relationship(graph, node_instance), + configure_node_instance, + postconfigure_relationship(graph, node_instance), + start_node_instance, + establish_relationship(graph, node_instance) + ) + + return graph + + +def preconfigure_relationship(graph, node_instance, **kwargs): + """ + + :param context: + :param graph: + :param node_instance: + :return: + """ + return relationships_tasks( + graph=graph, + operation_name='aria.interfaces.relationship_lifecycle.preconfigure', + node_instance=node_instance) + + +def postconfigure_relationship(graph, node_instance, **kwargs): + """ + + :param context: + :param graph: + :param node_instance: + :return: + """ + return relationships_tasks( + graph=graph, + operation_name='aria.interfaces.relationship_lifecycle.postconfigure', + node_instance=node_instance) + + +def establish_relationship(graph, node_instance, **kwargs): + """ + + :param context: + :param graph: + :param node_instance: + :return: + """ + return relationships_tasks( + graph=graph, + operation_name='aria.interfaces.relationship_lifecycle.establish', + node_instance=node_instance) + + +# Uninstall node instance workflow and subworkflows + +@workflow(suffix_template='{node_instance.id}') +def uninstall_node_instance(graph, node_instance, **kwargs): + """ + A workflow which uninstalls a node instance. + :param WorkflowContext context: the workflow context + :param TaskGraph graph: the tasks graph of which to edit + :param node_instance: the node instance to uninstall + :return: + """ + stop_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.stop') + delete_node_instance = task.OperationTask.node_instance( + instance=node_instance, + name='aria.interfaces.lifecycle.delete') + + graph.sequence( + stop_node_instance, + unlink_relationship(graph, node_instance), + delete_node_instance + ) + + +def unlink_relationship(graph, node_instance): + """ + + :param context: + :param graph: + :param node_instance: + :return: + """ + return relationships_tasks( + graph=graph, + operation_name='aria.interfaces.relationship_lifecycle.unlink', + node_instance=node_instance + ) + + +def execute_operation_on_instance( + node_instance, + operation, + operation_kwargs, + allow_kwargs_override): + """ + A workflow which executes a single operation + :param node_instance: the node instance to install + :param basestring operation: the operation name + :param dict operation_kwargs: + :param bool allow_kwargs_override: + :return: + """ + + if allow_kwargs_override is not None: + operation_kwargs['allow_kwargs_override'] = allow_kwargs_override + + return task.OperationTask.node_instance( + instance=node_instance, + name=operation, + inputs=operation_kwargs) + + +def relationships_tasks(graph, operation_name, node_instance): + """ + Creates a relationship task (source and target) for all of a node_instance relationships. + :param basestring operation_name: the relationship operation name. + :param WorkflowContext context: + :param NodeInstance node_instance: + :return: + """ + relationships_groups = groupby( + node_instance.relationship_instances, + key=lambda relationship_instance: relationship_instance.relationship.target_id) + + sub_tasks = [] + for _, (_, relationship_group) in enumerate(relationships_groups): + for relationship_instance in relationship_group: + relationship_operations = relationship_tasks( + relationship_instance=relationship_instance, + operation_name=operation_name) + sub_tasks.append(relationship_operations) + + return graph.sequence(*sub_tasks) + + +def relationship_tasks(relationship_instance, operation_name): + """ + Creates a relationship task source and target. + :param NodeInstance node_instance: the node instance of the relationship + :param RelationshipInstance relationship_instance: the relationship instance itself + :param WorkflowContext context: + :param operation_name: + :param index: the relationship index - enables pretty print + :return: + """ + source_operation = task.OperationTask.relationship_instance( + instance=relationship_instance, + name=operation_name, + operation_end=task.OperationTask.SOURCE_OPERATION) + target_operation = task.OperationTask.relationship_instance( + instance=relationship_instance, + name=operation_name, + operation_end=task.OperationTask.TARGET_OPERATION) + + return source_operation, target_operation http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/core/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/__init__.py b/aria/orchestrator/workflows/core/__init__.py new file mode 100644 index 0000000..e377153 --- /dev/null +++ b/aria/orchestrator/workflows/core/__init__.py @@ -0,0 +1,20 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Core for the workflow execution mechanism +""" + +from . import task, translation, engine http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py new file mode 100644 index 0000000..87ea8c6 --- /dev/null +++ b/aria/orchestrator/workflows/core/engine.py @@ -0,0 +1,116 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +The workflow engine. Executes workflows +""" + +import time +from datetime import datetime + +import networkx + +from aria import logger +from aria.storage import models +from aria.orchestrator import events + +from .. import exceptions +from . import task as engine_task +from . import translation + + +class Engine(logger.LoggerMixin): + """ + The workflow engine. Executes workflows + """ + + def __init__(self, executor, workflow_context, tasks_graph, **kwargs): + super(Engine, self).__init__(**kwargs) + self._workflow_context = workflow_context + self._execution_graph = networkx.DiGraph() + self._executor = executor + translation.build_execution_graph(task_graph=tasks_graph, + execution_graph=self._execution_graph) + + def execute(self): + """ + execute the workflow + """ + try: + events.start_workflow_signal.send(self._workflow_context) + cancel = False + while True: + cancel = self._is_cancel() + if cancel: + break + for task in self._ended_tasks(): + self._handle_ended_tasks(task) + for task in self._executable_tasks(): + self._handle_executable_task(task) + if self._all_tasks_consumed(): + break + else: + time.sleep(0.1) + if cancel: + events.on_cancelled_workflow_signal.send(self._workflow_context) + else: + events.on_success_workflow_signal.send(self._workflow_context) + except BaseException as e: + events.on_failure_workflow_signal.send(self._workflow_context, exception=e) + raise + + def cancel_execution(self): + """ + Send a cancel request to the engine. If execution already started, execution status + will be modified to 'cancelling' status. If execution is in pending mode, execution status + will be modified to 'cancelled' directly. + """ + events.on_cancelling_workflow_signal.send(self._workflow_context) + + def _is_cancel(self): + return self._workflow_context.execution.status in [models.Execution.CANCELLING, + models.Execution.CANCELLED] + + def _executable_tasks(self): + now = datetime.utcnow() + return (task for task in self._tasks_iter() + if task.status in models.Task.WAIT_STATES and + task.due_at <= now and + not self._task_has_dependencies(task)) + + def _ended_tasks(self): + return (task for task in self._tasks_iter() if task.status in models.Task.END_STATES) + + def _task_has_dependencies(self, task): + return len(self._execution_graph.pred.get(task.id, {})) > 0 + + def _all_tasks_consumed(self): + return len(self._execution_graph.node) == 0 + + def _tasks_iter(self): + return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True)) + + def _handle_executable_task(self, task): + if isinstance(task, engine_task.StubTask): + task.status = models.Task.SUCCESS + else: + events.sent_task_signal.send(task) + self._executor.execute(task) + + def _handle_ended_tasks(self, task): + if task.status == models.Task.FAILED and not task.ignore_failure: + raise exceptions.ExecutorException('Workflow failed') + else: + self._execution_graph.remove_node(task.id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py new file mode 100644 index 0000000..a583cfc --- /dev/null +++ b/aria/orchestrator/workflows/core/task.py @@ -0,0 +1,243 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow tasks +""" +from contextlib import contextmanager +from datetime import datetime +from functools import ( + partial, + wraps, +) + +from aria import logger +from aria.storage import models +from aria.orchestrator.context import operation as operation_context + +from .. import exceptions + + +def _locked(func=None): + if func is None: + return partial(_locked, func=_locked) + + @wraps(func) + def _wrapper(self, value, **kwargs): + if self._update_fields is None: + raise exceptions.TaskException("Task is not in update mode") + return func(self, value, **kwargs) + return _wrapper + + +class BaseTask(logger.LoggerMixin): + """ + Base class for Task objects + """ + + def __init__(self, id, *args, **kwargs): + super(BaseTask, self).__init__(*args, **kwargs) + self._id = id + + @property + def id(self): + """ + :return: the task's id + """ + return self._id + + +class StubTask(BaseTask): + """ + Base stub task for all tasks that don't actually run anything + """ + + def __init__(self, *args, **kwargs): + super(StubTask, self).__init__(*args, **kwargs) + self.status = models.Task.PENDING + self.due_at = datetime.utcnow() + + +class StartWorkflowTask(StubTask): + """ + Tasks marking a workflow start + """ + pass + + +class EndWorkflowTask(StubTask): + """ + Tasks marking a workflow end + """ + pass + + +class StartSubWorkflowTask(StubTask): + """ + Tasks marking a subworkflow start + """ + pass + + +class EndSubWorkflowTask(StubTask): + """ + Tasks marking a subworkflow end + """ + pass + + +class OperationTask(BaseTask): + """ + Operation tasks + """ + + def __init__(self, api_task, *args, **kwargs): + super(OperationTask, self).__init__(id=api_task.id, **kwargs) + self._workflow_context = api_task._workflow_context + task_model = api_task._workflow_context.model.task.model_cls + operation_task = task_model( + id=api_task.id, + name=api_task.name, + operation_mapping=api_task.operation_mapping, + actor=api_task.actor, + inputs=api_task.inputs, + status=task_model.PENDING, + execution_id=self._workflow_context._execution_id, + max_attempts=api_task.max_attempts, + retry_interval=api_task.retry_interval, + ignore_failure=api_task.ignore_failure + ) + + if isinstance(api_task.actor, models.NodeInstance): + context_class = operation_context.NodeOperationContext + elif isinstance(api_task.actor, models.RelationshipInstance): + context_class = operation_context.RelationshipOperationContext + else: + raise RuntimeError('No operation context could be created for {0}' + .format(api_task.actor.model_cls)) + + self._ctx = context_class(name=api_task.name, + workflow_context=self._workflow_context, + task=operation_task) + self._workflow_context.model.task.store(operation_task) + self._task_id = operation_task.id + self._update_fields = None + + @contextmanager + def _update(self): + """ + A context manager which puts the task into update mode, enabling fields update. + :yields: None + """ + self._update_fields = {} + try: + yield + task = self.model_task + for key, value in self._update_fields.items(): + setattr(task, key, value) + self.model_task = task + finally: + self._update_fields = None + + @property + def model_task(self): + """ + Returns the task model in storage + :return: task in storage + """ + return self._workflow_context.model.task.get(self._task_id) + + @model_task.setter + def model_task(self, value): + self._workflow_context.model.task.store(value) + + @property + def context(self): + """ + Contexts for the operation + :return: + """ + return self._ctx + + @property + def status(self): + """ + Returns the task status + :return: task status + """ + return self.model_task.status + + @status.setter + @_locked + def status(self, value): + self._update_fields['status'] = value + + @property + def started_at(self): + """ + Returns when the task started + :return: when task started + """ + return self.model_task.started_at + + @started_at.setter + @_locked + def started_at(self, value): + self._update_fields['started_at'] = value + + @property + def ended_at(self): + """ + Returns when the task ended + :return: when task ended + """ + return self.model_task.ended_at + + @ended_at.setter + @_locked + def ended_at(self, value): + self._update_fields['ended_at'] = value + + @property + def retry_count(self): + """ + Returns the retry count for the task + :return: retry count + """ + return self.model_task.retry_count + + @retry_count.setter + @_locked + def retry_count(self, value): + self._update_fields['retry_count'] = value + + @property + def due_at(self): + """ + Returns the minimum datetime in which the task can be executed + :return: eta + """ + return self.model_task.due_at + + @due_at.setter + @_locked + def due_at(self, value): + self._update_fields['due_at'] = value + + def __getattr__(self, attr): + try: + return getattr(self.model_task, attr) + except AttributeError: + return super(OperationTask, self).__getattribute__(attr) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py new file mode 100644 index 0000000..b6cbdad --- /dev/null +++ b/aria/orchestrator/workflows/core/translation.py @@ -0,0 +1,106 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Translation of user graph's API to the execution graph +""" + +from .. import api +from . import task as core_task + + +def build_execution_graph( + task_graph, + execution_graph, + start_cls=core_task.StartWorkflowTask, + end_cls=core_task.EndWorkflowTask, + depends_on=()): + """ + Translates the user graph to the execution graph + :param task_graph: The user's graph + :param workflow_context: The workflow + :param execution_graph: The execution graph that is being built + :param start_cls: internal use + :param end_cls: internal use + :param depends_on: internal use + """ + # Insert start marker + start_task = start_cls(id=_start_graph_suffix(task_graph.id)) + _add_task_and_dependencies(execution_graph, start_task, depends_on) + + for api_task in task_graph.topological_order(reverse=True): + dependencies = task_graph.get_dependencies(api_task) + operation_dependencies = _get_tasks_from_dependencies( + execution_graph, + dependencies, + default=[start_task]) + + if isinstance(api_task, api.task.OperationTask): + # Add the task an the dependencies + operation_task = core_task.OperationTask(api_task) + _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) + elif isinstance(api_task, api.task.WorkflowTask): + # Build the graph recursively while adding start and end markers + build_execution_graph( + task_graph=api_task, + execution_graph=execution_graph, + start_cls=core_task.StartSubWorkflowTask, + end_cls=core_task.EndSubWorkflowTask, + depends_on=operation_dependencies + ) + elif isinstance(api_task, api.task.StubTask): + stub_task = core_task.StubTask(id=api_task.id) + _add_task_and_dependencies(execution_graph, stub_task, operation_dependencies) + else: + raise RuntimeError('Undefined state') + + # Insert end marker + workflow_dependencies = _get_tasks_from_dependencies( + execution_graph, + _get_non_dependency_tasks(task_graph), + default=[start_task]) + end_task = end_cls(id=_end_graph_suffix(task_graph.id)) + _add_task_and_dependencies(execution_graph, end_task, workflow_dependencies) + + +def _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies=()): + execution_graph.add_node(operation_task.id, task=operation_task) + for dependency in operation_dependencies: + execution_graph.add_edge(dependency.id, operation_task.id) + + +def _get_tasks_from_dependencies(execution_graph, dependencies, default=()): + """ + Returns task list from dependencies. + """ + return [execution_graph.node[dependency.id + if isinstance(dependency, (api.task.OperationTask, + api.task.StubTask)) + else _end_graph_suffix(dependency.id)]['task'] + for dependency in dependencies] or default + + +def _start_graph_suffix(id): + return '{0}-Start'.format(id) + + +def _end_graph_suffix(id): + return '{0}-End'.format(id) + + +def _get_non_dependency_tasks(graph): + for task in graph.tasks: + if len(list(graph.get_dependents(task))) == 0: + yield task http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/exceptions.py b/aria/orchestrator/workflows/exceptions.py new file mode 100644 index 0000000..e2f5b59 --- /dev/null +++ b/aria/orchestrator/workflows/exceptions.py @@ -0,0 +1,71 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Workflow related Exception classes +""" +from .. import exceptions + + +class ExecutorException(exceptions.AriaError): + """ + General executor exception + """ + pass + + +class ProcessException(ExecutorException): + """ + Raised when subprocess execution fails + """ + + def __init__(self, command, stderr=None, stdout=None, return_code=None): + """ + Process class Exception + :param list command: child process command + :param str message: custom message + :param str stderr: child process stderr + :param str stdout: child process stdout + :param int return_code: child process exit code + """ + super(ProcessException, self).__init__("child process failed") + self.command = command + self.stderr = stderr + self.stdout = stdout + self.return_code = return_code + + @property + def explanation(self): + """ + Describes the error in detail + """ + return ( + 'Command "{error.command}" executed with an error.\n' + 'code: {error.return_code}\n' + 'error: {error.stderr}\n' + 'output: {error.stdout}'.format(error=self)) + + +class AriaEngineError(exceptions.AriaError): + """ + Raised by the workflow engine + """ + + +class TaskException(exceptions.AriaError): + """ + Raised by the task + """ + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/executor/__init__.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/__init__.py b/aria/orchestrator/workflows/executor/__init__.py new file mode 100644 index 0000000..09fb12c --- /dev/null +++ b/aria/orchestrator/workflows/executor/__init__.py @@ -0,0 +1,21 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Executors for task execution +""" + + +from . import blocking, celery, multiprocess, thread http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py new file mode 100644 index 0000000..ba44124 --- /dev/null +++ b/aria/orchestrator/workflows/executor/base.py @@ -0,0 +1,54 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Base executor module +""" + +from aria.orchestrator import events + + +class BaseExecutor(object): + """ + Base class for executors for running tasks + """ + + def __init__(self, *args, **kwargs): + pass + + def execute(self, task): + """ + Execute a task + :param task: task to execute + """ + raise NotImplementedError + + def close(self): + """ + Close the executor + """ + pass + + @staticmethod + def _task_started(task): + events.start_task_signal.send(task) + + @staticmethod + def _task_failed(task, exception): + events.on_failure_task_signal.send(task, exception=exception) + + @staticmethod + def _task_succeeded(task): + events.on_success_task_signal.send(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/executor/blocking.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/blocking.py b/aria/orchestrator/workflows/executor/blocking.py new file mode 100644 index 0000000..30bebbe --- /dev/null +++ b/aria/orchestrator/workflows/executor/blocking.py @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Blocking executor +""" + +from aria.tools import module +from .base import BaseExecutor + + +class CurrentThreadBlockingExecutor(BaseExecutor): + """ + Executor which runs tasks in the current thread (blocking) + """ + + def execute(self, task): + self._task_started(task) + try: + task_func = module.load_attribute(task.operation_mapping) + task_func(ctx=task.context, **task.inputs) + self._task_succeeded(task) + except BaseException as e: + self._task_failed(task, exception=e) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py new file mode 100644 index 0000000..baa97bd --- /dev/null +++ b/aria/orchestrator/workflows/executor/celery.py @@ -0,0 +1,97 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Celery based executor +""" + +import threading +import Queue + +from .base import BaseExecutor + + +class CeleryExecutor(BaseExecutor): + """ + Executor which runs tasks using celery + """ + + def __init__(self, app, *args, **kwargs): + super(CeleryExecutor, self).__init__(*args, **kwargs) + self._app = app + self._started_signaled = False + self._started_queue = Queue.Queue(maxsize=1) + self._tasks = {} + self._results = {} + self._receiver = None + self._stopped = False + self._receiver_thread = threading.Thread(target=self._events_receiver) + self._receiver_thread.daemon = True + self._receiver_thread.start() + self._started_queue.get(timeout=30) + + def execute(self, task): + self._tasks[task.id] = task + inputs = task.inputs.copy() + inputs['ctx'] = task.context + self._results[task.id] = self._app.send_task( + task.operation_mapping, + kwargs=inputs, + task_id=task.id, + queue=self._get_queue(task)) + + def close(self): + self._stopped = True + if self._receiver: + self._receiver.should_stop = True + self._receiver_thread.join() + + @staticmethod + def _get_queue(task): + return None if task else None # TODO + + def _events_receiver(self): + with self._app.connection() as connection: + self._receiver = self._app.events.Receiver(connection, handlers={ + 'task-started': self._celery_task_started, + 'task-succeeded': self._celery_task_succeeded, + 'task-failed': self._celery_task_failed, + }) + for _ in self._receiver.itercapture(limit=None, timeout=None, wakeup=True): + if not self._started_signaled: + self._started_queue.put(True) + self._started_signaled = True + if self._stopped: + return + + def _celery_task_started(self, event): + self._task_started(self._tasks[event['uuid']]) + + def _celery_task_succeeded(self, event): + task, _ = self._remove_task(event['uuid']) + self._task_succeeded(task) + + def _celery_task_failed(self, event): + task, async_result = self._remove_task(event['uuid']) + try: + exception = async_result.result + except BaseException as e: + exception = RuntimeError( + 'Could not de-serialize exception of task {0} --> {1}: {2}' + .format(task.name, type(e).__name__, str(e))) + self._task_failed(task, exception=exception) + + def _remove_task(self, task_id): + return self._tasks.pop(task_id), self._results.pop(task_id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/executor/multiprocess.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/multiprocess.py b/aria/orchestrator/workflows/executor/multiprocess.py new file mode 100644 index 0000000..e73ab09 --- /dev/null +++ b/aria/orchestrator/workflows/executor/multiprocess.py @@ -0,0 +1,98 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Multiprocess based executor +""" + +import multiprocessing +import threading + +import jsonpickle + +from aria.tools import module +from .base import BaseExecutor + + +class MultiprocessExecutor(BaseExecutor): + """ + Executor which runs tasks in a multiprocess environment + """ + + def __init__(self, pool_size=1, *args, **kwargs): + super(MultiprocessExecutor, self).__init__(*args, **kwargs) + self._stopped = False + self._manager = multiprocessing.Manager() + self._queue = self._manager.Queue() + self._tasks = {} + self._listener_thread = threading.Thread(target=self._listener) + self._listener_thread.daemon = True + self._listener_thread.start() + self._pool = multiprocessing.Pool(processes=pool_size) + + def execute(self, task): + self._tasks[task.id] = task + self._pool.apply_async(_multiprocess_handler, args=( + self._queue, + task.context, + task.id, + task.operation_mapping, + task.inputs)) + + def close(self): + self._pool.close() + self._stopped = True + self._pool.join() + self._listener_thread.join() + + def _listener(self): + while not self._stopped: + try: + message = self._queue.get(timeout=1) + if message.type == 'task_started': + self._task_started(self._tasks[message.task_id]) + elif message.type == 'task_succeeded': + self._task_succeeded(self._remove_task(message.task_id)) + elif message.type == 'task_failed': + self._task_failed(self._remove_task(message.task_id), + exception=jsonpickle.loads(message.exception)) + else: + # TODO: something + raise RuntimeError() + # Daemon threads + except BaseException: + pass + + def _remove_task(self, task_id): + return self._tasks.pop(task_id) + + +class _MultiprocessMessage(object): + + def __init__(self, type, task_id, exception=None): + self.type = type + self.task_id = task_id + self.exception = exception + + +def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs): + queue.put(_MultiprocessMessage(type='task_started', task_id=task_id)) + try: + task_func = module.load_attribute(operation_mapping) + task_func(ctx=ctx, **operation_inputs) + queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id)) + except BaseException as e: + queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id, + exception=jsonpickle.dumps(e))) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py new file mode 100644 index 0000000..fd7b302 --- /dev/null +++ b/aria/orchestrator/workflows/executor/thread.py @@ -0,0 +1,65 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Thread based executor +""" + +import Queue +import threading + +from aria.tools import module +from .base import BaseExecutor + + +class ThreadExecutor(BaseExecutor): + """ + Executor which runs tasks in a separate thread + """ + + def __init__(self, pool_size=1, *args, **kwargs): + super(ThreadExecutor, self).__init__(*args, **kwargs) + self._stopped = False + self._queue = Queue.Queue() + self._pool = [] + for i in range(pool_size): + name = 'ThreadExecutor-{index}'.format(index=i+1) + thread = threading.Thread(target=self._processor, name=name) + thread.daemon = True + thread.start() + self._pool.append(thread) + + def execute(self, task): + self._queue.put(task) + + def close(self): + self._stopped = True + for thread in self._pool: + thread.join() + + def _processor(self): + while not self._stopped: + try: + task = self._queue.get(timeout=1) + self._task_started(task) + try: + task_func = module.load_attribute(task.operation_mapping) + task_func(ctx=task.context, **task.inputs) + self._task_succeeded(task) + except BaseException as e: + self._task_failed(task, exception=e) + # Daemon threads + except BaseException: + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index c5a7421..2d142a5 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -49,7 +49,7 @@ from .drivers import ( FileSystemResourceDriver, FileSystemModelDriver, ) -from . import models +from . import models, exceptions __all__ = ( 'ModelStorage', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/storage/drivers.py ---------------------------------------------------------------------- diff --git a/aria/storage/drivers.py b/aria/storage/drivers.py index 8b7d3af..1f96956 100644 --- a/aria/storage/drivers.py +++ b/aria/storage/drivers.py @@ -27,18 +27,16 @@ classes: * FileSystemResourceDriver - file system implementation for resource storage driver. """ +import distutils.dir_util # pylint: disable=no-name-in-module, import-error import os import shutil -# pylint has an issue with distutils and virtualenvs: https://github.com/PyCQA/pylint/issues/73 -import distutils.dir_util # pylint: disable=no-name-in-module, import-error from functools import partial from multiprocessing import RLock import jsonpickle -from ..exceptions import StorageError from ..logger import LoggerMixin - +from .exceptions import StorageError __all__ = ( 'ModelDriver', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/storage/exceptions.py ---------------------------------------------------------------------- diff --git a/aria/storage/exceptions.py b/aria/storage/exceptions.py new file mode 100644 index 0000000..22dfc50 --- /dev/null +++ b/aria/storage/exceptions.py @@ -0,0 +1,23 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from .. import exceptions + + +class StorageError(exceptions.AriaError): + """ + General storage exception + """ + pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/storage/structures.py ---------------------------------------------------------------------- diff --git a/aria/storage/structures.py b/aria/storage/structures.py index 399922e..a833d99 100644 --- a/aria/storage/structures.py +++ b/aria/storage/structures.py @@ -27,14 +27,13 @@ classes: * Model - abstract model implementation. """ import json -from uuid import uuid4 from itertools import count +from uuid import uuid4 +from .exceptions import StorageError from ..logger import LoggerMixin -from ..exceptions import StorageError from ..tools.validation import ValidatorMixin - __all__ = ( 'uuid_generator', 'Field', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/tools/__init__.py ---------------------------------------------------------------------- diff --git a/aria/tools/__init__.py b/aria/tools/__init__.py index ae1e83e..320b445 100644 --- a/aria/tools/__init__.py +++ b/aria/tools/__init__.py @@ -12,3 +12,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from .lru_cache import lru_cache +from .module import load_attribute +from .plugin import plugin_installer +from .process import Process +from .validation import validate_function_arguments, ValidatorMixin http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/tools/application.py ---------------------------------------------------------------------- diff --git a/aria/tools/application.py b/aria/tools/application.py index 360ba33..b1a7fcc 100644 --- a/aria/tools/application.py +++ b/aria/tools/application.py @@ -18,15 +18,15 @@ Convenience storage related tools. # TODO rename module name """ -import os import json +import os import shutil import tarfile import tempfile from datetime import datetime +from aria.storage.exceptions import StorageError from aria.logger import LoggerMixin -from aria.exceptions import StorageError class StorageManager(LoggerMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/tools/process.py ---------------------------------------------------------------------- diff --git a/aria/tools/process.py b/aria/tools/process.py index 5a3d8a0..b9586b6 100644 --- a/aria/tools/process.py +++ b/aria/tools/process.py @@ -23,7 +23,7 @@ from signal import SIGKILL from time import sleep from aria.logger import LoggerMixin -from aria.exceptions import ExecutorException, ProcessException +from aria.orchestrator.workflows.exceptions import ExecutorException, ProcessException class Process(LoggerMixin): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/workflows/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/__init__.py b/aria/workflows/__init__.py deleted file mode 100644 index ae1e83e..0000000 --- a/aria/workflows/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d24d8bf4/aria/workflows/api/__init__.py ---------------------------------------------------------------------- diff --git a/aria/workflows/api/__init__.py b/aria/workflows/api/__init__.py deleted file mode 100644 index a3a17ee..0000000 --- a/aria/workflows/api/__init__.py +++ /dev/null @@ -1,20 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Provides API for building tasks -""" - -from . import task, task_graph