ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: phase 1 fixes [Forced Update!]
Date Tue, 20 Jun 2017 13:07:13 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 3281ad952 -> 7762a492e (forced update)


phase 1 fixes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/7762a492
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/7762a492
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/7762a492

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 7762a492ef1c78266dc6bbb4fb56dc1913264fae
Parents: 507796e
Author: max-orlov <maxim@gigaspaces.com>
Authored: Tue Jun 20 15:34:57 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Tue Jun 20 16:07:09 2017 +0300

----------------------------------------------------------------------
 aria/modeling/orchestration.py                  |  14 +--
 aria/orchestrator/context/operation.py          |   2 +-
 aria/orchestrator/context/workflow.py           |   2 +-
 aria/orchestrator/workflow_runner.py            |   6 +-
 aria/orchestrator/workflows/api/task.py         |   6 +
 aria/orchestrator/workflows/core/engine.py      | 122 +++----------------
 .../workflows/core/events_handler.py            |  19 ++-
 aria/orchestrator/workflows/core/task.py        | 112 +++++++++++++++++
 aria/orchestrator/workflows/events_logging.py   |  56 ++++-----
 aria/orchestrator/workflows/executor/base.py    |   4 +-
 aria/orchestrator/workflows/executor/dry.py     |   2 +-
 tests/orchestrator/context/__init__.py          |   5 +-
 tests/orchestrator/context/test_serialize.py    |   4 +-
 .../orchestrator/execution_plugin/test_local.py |   5 +-
 tests/orchestrator/execution_plugin/test_ssh.py |   4 +-
 .../orchestrator/workflows/core/test_engine.py  |   8 +-
 .../orchestrator/workflows/core/test_events.py  |   6 +-
 .../test_task_graph_into_execution_graph.py     |  15 +--
 .../orchestrator/workflows/executor/__init__.py |   2 +-
 .../executor/test_process_executor_extension.py |   4 +-
 .../test_process_executor_tracked_changes.py    |   4 +-
 21 files changed, 206 insertions(+), 196 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/modeling/orchestration.py
----------------------------------------------------------------------
diff --git a/aria/modeling/orchestration.py b/aria/modeling/orchestration.py
index 007eefa..17d2476 100644
--- a/aria/modeling/orchestration.py
+++ b/aria/modeling/orchestration.py
@@ -304,10 +304,11 @@ class TaskBase(mixins.ModelMixin):
     started_at = Column(DateTime, default=None)
     ended_at = Column(DateTime, default=None)
     attempts_count = Column(Integer, default=1)
-    api_id = Column(String)
 
+    _api_id = Column(String)
     _executor = Column(PickleType)
     _context_cls = Column(PickleType)
+    _stub_type = Column(Enum(*STUB_TYPES))
 
     @declared_attr
     def logs(cls):
@@ -336,8 +337,6 @@ class TaskBase(mixins.ModelMixin):
     interface_name = Column(String)
     operation_name = Column(String)
 
-    stub_type = Column(Enum(*STUB_TYPES))
-
     @property
     def actor(self):
         """
@@ -410,21 +409,18 @@ class TaskBase(mixins.ModelMixin):
         return self.status in (self.SUCCESS, self.FAILED)
 
     def is_waiting(self):
-        if self.stub_type:
+        if self._stub_type:
             return not self.has_ended()
         else:
             return self.status in (self.PENDING, self.RETRYING)
 
     @classmethod
     def from_api_task(cls, api_task, executor, **kwargs):
-        from aria.orchestrator import context
         instantiation_kwargs = {}
 
         if hasattr(api_task.actor, 'outbound_relationships'):
-            context_cls = context.operation.NodeOperationContext
             instantiation_kwargs['node'] = api_task.actor
         elif hasattr(api_task.actor, 'source_node'):
-            context_cls = context.operation.RelationshipOperationContext
             instantiation_kwargs['relationship'] = api_task.actor
         else:
             raise RuntimeError('No operation context could be created for {actor.model_cls}'
@@ -445,8 +441,8 @@ class TaskBase(mixins.ModelMixin):
                 'plugin': api_task.plugin,
                 'function': api_task.function,
                 'arguments': api_task.arguments,
-                'api_id': api_task.id,
-                '_context_cls': context_cls,
+                '_api_id': api_task.id,
+                '_context_cls': api_task._context_cls,
                 '_executor': executor,
             }
         )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 07bf297..d43b847 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -109,7 +109,7 @@ class BaseOperationContext(common.BaseContext):
 
     @property
     @contextmanager
-    def track_changes(self):
+    def persist_changes(self):
         yield
         self.model.task.update(self.task)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 4b7573f..aa5a786 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -109,7 +109,7 @@ class WorkflowContext(BaseContext):
 
     @property
     @contextmanager
-    def track_changes(self):
+    def persist_changes(self):
         yield
         self._model.execution.update(self.execution)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
index c30ec4b..66eec4a 100644
--- a/aria/orchestrator/workflow_runner.py
+++ b/aria/orchestrator/workflow_runner.py
@@ -24,7 +24,7 @@ from datetime import datetime
 from . import exceptions
 from .context.workflow import WorkflowContext
 from .workflows import builtin
-from .workflows.core import engine
+from .workflows.core import engine, task
 from .workflows.executor.process import ProcessExecutor
 from ..modeling import models
 from ..modeling import utils as modeling_utils
@@ -87,12 +87,12 @@ class WorkflowRunner(object):
         execution_inputs_dict = dict(inp.unwrapped for inp in self.execution.inputs.values())
 
         self._tasks_graph = workflow_fn(ctx=self._workflow_context, **execution_inputs_dict)
-        engine.construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
+        task.construct_execution_tasks(self.execution, self._tasks_graph, executor.__class__)
 
         # Update the state
         self._model_storage.execution.update(execution)
 
-        self._engine = engine.Engine(default_executor=executor)
+        self._engine = engine.Engine(executors={executor.__class__: executor})
 
     @property
     def execution_id(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index ca125a8..0e80e8a 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -140,6 +140,12 @@ class OperationTask(BaseTask):
         self.arguments = modeling_utils.merge_parameter_values(arguments,
                                                                operation.arguments,
                                                                model_cls=models.Argument)
+        if isinstance(self.actor, models.Node):
+            self._context_cls = context.operation.NodeOperationContext
+        elif isinstance(self.actor, models.Relationship):
+            self._context_cls = context.operation.RelationshipOperationContext
+        else:
+            self._context_cls = context.operation.BaseOperationContext
 
     def __repr__(self):
         return self.name

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index cec561f..7d542d0 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -35,25 +35,25 @@ class Engine(logger.LoggerMixin):
     The workflow engine. Executes workflows
     """
 
-    def __init__(self, default_executor, **kwargs):
+    def __init__(self, executors, **kwargs):
         super(Engine, self).__init__(**kwargs)
-        self._executors = {default_executor.__class__: default_executor}
-        self._executing_tasks = []
+        self._executors = executors.copy
 
     def execute(self, ctx):
         """
         execute the workflow
         """
+        executing_tasks = []
         try:
             events.start_workflow_signal.send(ctx)
             while True:
                 cancel = self._is_cancel(ctx)
                 if cancel:
                     break
-                for task in self._ended_tasks(ctx):
-                    self._handle_ended_tasks(ctx, task)
+                for task in self._ended_tasks(ctx, executing_tasks):
+                    self._handle_ended_tasks(ctx, task, executing_tasks)
                 for task in self._executable_tasks(ctx):
-                    self._handle_executable_task(ctx, task)
+                    self._handle_executable_task(ctx, task, executing_tasks)
                 if self._all_tasks_consumed(ctx):
                     break
                 else:
@@ -77,7 +77,7 @@ class Engine(logger.LoggerMixin):
 
     @staticmethod
     def _is_cancel(ctx):
-        execution = ctx.model.execution.update(ctx.execution)
+        execution = ctx.model.execution.refresh(ctx.execution)
         return execution.status in (models.Execution.CANCELLING, models.Execution.CANCELLED)
 
     def _executable_tasks(self, ctx):
@@ -88,8 +88,8 @@ class Engine(logger.LoggerMixin):
             not self._task_has_dependencies(ctx, task)
         )
 
-    def _ended_tasks(self, ctx):
-        for task in self._executing_tasks:
+    def _ended_tasks(self, ctx, executing_tasks):
+        for task in executing_tasks:
             if task.has_ended() and task in ctx._graph:
                 yield task
 
@@ -106,12 +106,11 @@ class Engine(logger.LoggerMixin):
         for task in ctx.execution.tasks:
             yield ctx.model.task.refresh(task)
 
-    def _handle_executable_task(self, ctx, task):
-        if task._executor not in self._executors:
-            self._executors[task._executor] = task._executor()
+    def _handle_executable_task(self, ctx, task, executing_tasks):
         task_executor = self._executors[task._executor]
 
-        context_cls = task._context_cls or operation.BaseOperationContext
+        # If the task is a stub, a default context is provided, else it should hold the context
cls
+        context_cls = operation.BaseOperationContext if task._stub_type else task._context_cls
         op_ctx = context_cls(
             model_storage=ctx.model,
             resource_storage=ctx.resource,
@@ -123,104 +122,15 @@ class Engine(logger.LoggerMixin):
             name=task.name
         )
 
-        self._executing_tasks.append(task)
+        executing_tasks.append(task)
 
-        if not task.stub_type:
+        if not task._stub_type:
             events.sent_task_signal.send(op_ctx)
         task_executor.execute(op_ctx)
 
-    def _handle_ended_tasks(self, ctx, task):
-        self._executing_tasks.remove(task)
+    def _handle_ended_tasks(self, ctx, task, executing_tasks):
+        executing_tasks.remove(task)
         if task.status == models.Task.FAILED and not task.ignore_failure:
             raise exceptions.ExecutorException('Workflow failed')
         else:
             ctx._graph.remove_node(task)
-
-
-def construct_execution_tasks(execution,
-                              task_graph,
-                              default_executor,
-                              stub_executor=executor.base.StubTaskExecutor,
-                              start_stub_type=models.Task.START_WORKFLOW,
-                              end_stub_type=models.Task.END_WORKFLOW,
-                              depends_on=()):
-    """
-    Translates the user graph to the execution graph
-    :param task_graph: The user's graph
-    :param start_stub_type: internal use
-    :param end_stub_type: internal use
-    :param depends_on: internal use
-    """
-    depends_on = list(depends_on)
-
-    # Insert start marker
-    start_task = models.Task(api_id=_start_graph_suffix(task_graph.id),
-                             _executor=stub_executor,
-                             execution=execution,
-                             stub_type=start_stub_type,
-                             dependencies=depends_on)
-
-    for task in task_graph.topological_order(reverse=True):
-        operation_dependencies = _get_tasks_from_dependencies(
-            execution, task_graph.get_dependencies(task), [start_task])
-
-        if isinstance(task, api.task.OperationTask):
-            models.Task.from_api_task(api_task=task,
-                                      executor=default_executor,
-                                      dependencies=operation_dependencies)
-
-        elif isinstance(task, api.task.WorkflowTask):
-            # Build the graph recursively while adding start and end markers
-            construct_execution_tasks(
-                execution=execution,
-                task_graph=task,
-                default_executor=default_executor,
-                stub_executor=stub_executor,
-                start_stub_type=models.Task.START_SUBWROFKLOW,
-                end_stub_type=models.Task.END_SUBWORKFLOW,
-                depends_on=operation_dependencies
-            )
-        elif isinstance(task, api.task.StubTask):
-            models.Task(api_id=task.id,
-                        _executor=stub_executor,
-                        execution=execution,
-                        stub_type=models.Task.STUB,
-                        dependencies=operation_dependencies)
-        else:
-            raise RuntimeError('Undefined state')
-
-    # Insert end marker
-    models.Task(api_id=_end_graph_suffix(task_graph.id),
-                _executor=stub_executor,
-                execution=execution,
-                stub_type=end_stub_type,
-                dependencies=_get_non_dependent_tasks(execution) or [start_task])
-
-
-def _start_graph_suffix(api_id):
-    return '{0}-Start'.format(api_id)
-
-
-def _end_graph_suffix(api_id):
-    return '{0}-End'.format(api_id)
-
-
-def _get_non_dependent_tasks(execution):
-    dependency_tasks = set()
-    for task in execution.tasks:
-        dependency_tasks.update(task.dependencies)
-    return list(set(execution.tasks) - set(dependency_tasks))
-
-
-def _get_tasks_from_dependencies(execution, dependencies, default=()):
-    """
-    Returns task list from dependencies.
-    """
-    tasks = []
-    for dependency in dependencies:
-        if getattr(dependency, 'actor', False):
-            dependency_name = dependency.id
-        else:
-            dependency_name = _end_graph_suffix(dependency.id)
-        tasks.extend(task for task in execution.tasks if task.api_id == dependency_name)
-    return tasks or default

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 3a780d5..2d71d2a 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -31,13 +31,13 @@ from ... import exceptions
 
 @events.sent_task_signal.connect
 def _task_sent(ctx, *args, **kwargs):
-    with ctx.track_changes:
+    with ctx.persist_changes:
         ctx.task.status = ctx.task.SENT
 
 
 @events.start_task_signal.connect
 def _task_started(ctx, *args, **kwargs):
-    with ctx.track_changes:
+    with ctx.persist_changes:
         ctx.task.started_at = datetime.utcnow()
         ctx.task.status = ctx.task.STARTED
         _update_node_state_if_necessary(ctx, is_transitional=True)
@@ -45,7 +45,7 @@ def _task_started(ctx, *args, **kwargs):
 
 @events.on_failure_task_signal.connect
 def _task_failed(ctx, exception, *args, **kwargs):
-    with ctx.track_changes:
+    with ctx.persist_changes:
         should_retry = all([
             not isinstance(exception, exceptions.TaskAbortException),
             ctx.task.attempts_count < ctx.task.max_attempts or
@@ -71,7 +71,7 @@ def _task_failed(ctx, exception, *args, **kwargs):
 
 @events.on_success_task_signal.connect
 def _task_succeeded(ctx, *args, **kwargs):
-    with ctx.track_changes:
+    with ctx.persist_changes:
         ctx.task.ended_at = datetime.utcnow()
         ctx.task.status = ctx.task.SUCCESS
 
@@ -80,7 +80,7 @@ def _task_succeeded(ctx, *args, **kwargs):
 
 @events.start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         # the execution may already be in the process of cancelling
         if execution.status in (execution.CANCELLING, execution.CANCELLED):
@@ -91,7 +91,7 @@ def _workflow_started(workflow_context, *args, **kwargs):
 
 @events.on_failure_workflow_signal.connect
 def _workflow_failed(workflow_context, exception, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         execution.error = str(exception)
         execution.status = execution.FAILED
@@ -100,7 +100,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs):
 
 @events.on_success_workflow_signal.connect
 def _workflow_succeeded(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         execution.status = execution.SUCCEEDED
         execution.ended_at = datetime.utcnow()
@@ -108,7 +108,7 @@ def _workflow_succeeded(workflow_context, *args, **kwargs):
 
 @events.on_cancelled_workflow_signal.connect
 def _workflow_cancelled(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         # _workflow_cancelling function may have called this function already
         if execution.status == execution.CANCELLED:
@@ -123,7 +123,7 @@ def _workflow_cancelled(workflow_context, *args, **kwargs):
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):
-    with workflow_context.track_changes:
+    with workflow_context.persist_changes:
         execution = workflow_context.execution
         if execution.status == execution.PENDING:
             return _workflow_cancelled(workflow_context=workflow_context)
@@ -132,7 +132,6 @@ def _workflow_cancelling(workflow_context, *args, **kwargs):
             _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
         else:
             execution.status = execution.CANCELLING
-            workflow_context.execution = execution
 
 
 def _update_node_state_if_necessary(ctx, is_transitional=False):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
new file mode 100644
index 0000000..cf1b7bc
--- /dev/null
+++ b/aria/orchestrator/workflows/core/task.py
@@ -0,0 +1,112 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+The workflow engine. Executes workflows
+"""
+
+from ....modeling import models
+from .. import executor, api
+
+
+def construct_execution_tasks(execution,
+                              task_graph,
+                              default_executor,
+                              stub_executor=executor.base.StubTaskExecutor,
+                              start_stub_type=models.Task.START_WORKFLOW,
+                              end_stub_type=models.Task.END_WORKFLOW,
+                              depends_on=()):
+    """
+    Translates the user graph to the execution graph
+    :param task_graph: The user's graph
+    :param start_stub_type: internal use
+    :param end_stub_type: internal use
+    :param depends_on: internal use
+    """
+    depends_on = list(depends_on)
+
+    # Insert start marker
+    start_task = models.Task(execution=execution,
+                             dependencies=depends_on,
+                             _api_id=_start_graph_suffix(task_graph.id),
+                             _stub_type=start_stub_type,
+                             _executor=stub_executor)
+
+    for task in task_graph.topological_order(reverse=True):
+        operation_dependencies = _get_tasks_from_dependencies(
+            execution, task_graph.get_dependencies(task), [start_task])
+
+        if isinstance(task, api.task.OperationTask):
+            models.Task.from_api_task(api_task=task,
+                                      executor=default_executor,
+                                      dependencies=operation_dependencies)
+
+        elif isinstance(task, api.task.WorkflowTask):
+            # Build the graph recursively while adding start and end markers
+            construct_execution_tasks(
+                execution=execution,
+                task_graph=task,
+                default_executor=default_executor,
+                stub_executor=stub_executor,
+                start_stub_type=models.Task.START_SUBWROFKLOW,
+                end_stub_type=models.Task.END_SUBWORKFLOW,
+                depends_on=operation_dependencies
+            )
+        elif isinstance(task, api.task.StubTask):
+            models.Task(execution=execution,
+                        dependencies=operation_dependencies,
+                        _api_id=task.id,
+                        _executor=stub_executor,
+                        _stub_type=models.Task.STUB,
+                        )
+        else:
+            raise RuntimeError('Undefined state')
+
+    # Insert end marker
+    models.Task(dependencies=_get_non_dependent_tasks(execution) or [start_task],
+                execution=execution,
+                _api_id=_end_graph_suffix(task_graph.id),
+                _executor=stub_executor,
+                _stub_type=end_stub_type)
+
+
+def _start_graph_suffix(api_id):
+    return '{0}-Start'.format(api_id)
+
+
+def _end_graph_suffix(api_id):
+    return '{0}-End'.format(api_id)
+
+
+def _get_non_dependent_tasks(execution):
+    tasks_with_dependencies = set()
+    for task in execution.tasks:
+        tasks_with_dependencies.update(task.dependencies)
+    return list(set(execution.tasks) - set(tasks_with_dependencies))
+
+
+def _get_tasks_from_dependencies(execution, dependencies, default=()):
+    """
+    Returns task list from dependencies.
+    """
+    tasks = []
+    for dependency in dependencies:
+        if getattr(dependency, 'actor', False):
+            # This is
+            dependency_name = dependency.id
+        else:
+            dependency_name = _end_graph_suffix(dependency.id)
+        tasks.extend(task for task in execution.tasks if task._api_id == dependency_name)
+    return tasks or default

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index 12aebab..4cee867 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -35,66 +35,54 @@ def _get_task_name(task):
 
 @events.start_task_signal.connect
 def _start_task_handler(ctx, **kwargs):
-    with ctx.track_changes:
-        # If the task has no function this is an empty task.
-        if ctx.task.function:
-            suffix = 'started...'
-            logger = ctx.logger.info
-        else:
-            suffix = 'has no implementation'
-            logger = ctx.logger.debug
+    # If the task has no function this is an empty task.
+    if ctx.task.function:
+        suffix = 'started...'
+        logger = ctx.logger.info
+    else:
+        suffix = 'has no implementation'
+        logger = ctx.logger.debug
 
-        logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
-            name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
+    logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
+        name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
 
 
 @events.on_success_task_signal.connect
 def _success_task_handler(ctx, **kwargs):
-    with ctx.track_changes:
-        if not ctx.task.function:
-            return
-        ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
-                        .format(name=_get_task_name(ctx.task), task=ctx.task))
+    if not ctx.task.function:
+        return
+    ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
+                    .format(name=_get_task_name(ctx.task), task=ctx.task))
 
 
 @events.on_failure_task_signal.connect
 def _failure_operation_handler(ctx, traceback, **kwargs):
-    with ctx.track_changes:
-        ctx.logger.error(
-            '{name} {task.interface_name}.{task.operation_name} failed'
-            .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
-        )
+    ctx.logger.error(
+        '{name} {task.interface_name}.{task.operation_name} failed'
+        .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
+    )
 
 
 @events.start_workflow_signal.connect
 def _start_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
+    context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
 
 
 @events.on_failure_workflow_signal.connect
 def _failure_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
 
 
 @events.on_success_workflow_signal.connect
 def _success_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info(
-            "'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context)
-        )
+    context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context))
 
 
 @events.on_cancelled_workflow_signal.connect
 def _cancel_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
+    context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
 
 
 @events.on_cancelling_workflow_signal.connect
 def _cancelling_workflow_handler(context, **kwargs):
-    with context.track_changes:
-        context.logger.info(
-            "Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context)
-        )
+    context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 9e1ce7e..257d12c 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -33,7 +33,7 @@ class BaseExecutor(logger.LoggerMixin):
         Execute a task
         :param task: task to execute
         """
-        with ctx.track_changes:
+        with ctx.persist_changes:
             if ctx.task.function:
                 self._execute(ctx)
             else:
@@ -64,5 +64,5 @@ class BaseExecutor(logger.LoggerMixin):
 
 class StubTaskExecutor(BaseExecutor):                                                   
           # pylint: disable=abstract-method
     def execute(self, ctx, *args, **kwargs):
-        with ctx.track_changes:
+        with ctx.persist_changes:
             ctx.task.status = ctx.task.SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index 88d2e12..9d86125 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -26,7 +26,7 @@ class DryExecutor(base.BaseExecutor):
     Executor which dry runs tasks - prints task information without causing any side effects
     """
     def execute(self, ctx):
-        with ctx.track_changes:
+        with ctx.persist_changes:
             # updating the task manually instead of calling self._task_started(task),
             # to avoid any side effects raising that event might cause
             ctx.task.started_at = datetime.utcnow()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/__init__.py b/tests/orchestrator/context/__init__.py
index abe92b9..d6c5d26 100644
--- a/tests/orchestrator/context/__init__.py
+++ b/tests/orchestrator/context/__init__.py
@@ -15,8 +15,7 @@
 
 import sys
 
-from aria.orchestrator.workflows.core import engine
-from aria.orchestrator import workflow_runner
+from aria.orchestrator.workflows.core import engine, task
 
 
 def op_path(func, module_path=None):
@@ -27,7 +26,7 @@ def op_path(func, module_path=None):
 def execute(workflow_func, workflow_context, executor):
     graph = workflow_func(ctx=workflow_context)
 
-    engine.construct_execution_tasks(workflow_context.execution, graph, executor.__class__)
+    task.construct_execution_tasks(workflow_context.execution, graph, executor.__class__)
     workflow_context.execution = workflow_context.execution
     eng = engine.Engine(executor)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 84d8952..ee02b41 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -16,7 +16,7 @@
 import pytest
 
 from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator import workflow, operation
 import tests
@@ -48,7 +48,7 @@ def test_serialize_operation_context(context, executor, tmpdir):
     context.model.node.update(node)
 
     graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    engine.construct_execution_tasks(context.execution, graph, executor.__class__)
+    task.construct_execution_tasks(context.execution, graph, executor.__class__)
     context.execution = context.execution
     eng = engine.Engine(executor)
     eng.execute(context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index cb1503b..8f48e4a 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -28,7 +28,7 @@ from aria.orchestrator.execution_plugin.exceptions import ProcessException
 from aria.orchestrator.execution_plugin import local
 from aria.orchestrator.execution_plugin import constants
 from aria.orchestrator.workflows.executor import process
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 
 from tests import mock
 from tests import storage
@@ -500,8 +500,7 @@ if __name__ == '__main__':
                 arguments=arguments))
             return graph
         tasks_graph = mock_workflow(ctx=workflow_context)  # pylint: disable=no-value-for-parameter
-        engine.construct_execution_tasks(
-            workflow_context.execution, tasks_graph, executor.__class__)
+        task.construct_execution_tasks(workflow_context.execution, tasks_graph, executor.__class__)
         workflow_context.execution = workflow_context.execution
         eng = engine.Engine(executor)
         eng.execute(workflow_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 2eb3c0b..bfc0b98 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -29,7 +29,7 @@ from aria.orchestrator import events
 from aria.orchestrator import workflow
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.executor import process
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.exceptions import ExecutorException
 from aria.orchestrator.exceptions import TaskAbortException, TaskRetryException
 from aria.orchestrator.execution_plugin import operations
@@ -254,7 +254,7 @@ class TestWithActualSSHServer(object):
             graph.sequence(*ops)
             return graph
         tasks_graph = mock_workflow(ctx=self._workflow_context)  # pylint: disable=no-value-for-parameter
-        engine.construct_execution_tasks(
+        task.construct_execution_tasks(
             self._workflow_context.execution, tasks_graph, self._executor.__class__)
         self._workflow_context.execution = self._workflow_context.execution
         eng = engine.Engine(self._executor)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 7bdac67..f6f638f 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -28,7 +28,7 @@ from aria.orchestrator.workflows import (
     api,
     exceptions,
 )
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor import thread
 
 from tests import mock, storage
@@ -51,10 +51,10 @@ class BaseTest(object):
     def _engine(workflow_func, workflow_context, executor):
         graph = workflow_func(ctx=workflow_context)
         execution = workflow_context.execution
-        engine.construct_execution_tasks(execution, graph, executor.__class__)
+        task.construct_execution_tasks(execution, graph, executor.__class__)
         workflow_context.execution = execution
 
-        return engine.Engine(default_executor=executor)
+        return engine.Engine(executors=executor)
 
     @staticmethod
     def _create_interface(ctx, func, arguments=None):
@@ -101,7 +101,7 @@ class BaseTest(object):
     @pytest.fixture(autouse=True)
     def signals_registration(self, ):
         def sent_task_handler(ctx, *args, **kwargs):
-            if ctx.task.stub_type is None:
+            if ctx.task._stub_type is None:
                 calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
                 global_test_holder['sent_task_signal_calls'] = calls + 1
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/workflows/core/test_events.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_events.py b/tests/orchestrator/workflows/core/test_events.py
index f0699df..c30b373 100644
--- a/tests/orchestrator/workflows/core/test_events.py
+++ b/tests/orchestrator/workflows/core/test_events.py
@@ -16,7 +16,7 @@
 import pytest
 
 from aria.orchestrator.decorators import operation, workflow
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor.thread import ThreadExecutor
 from aria.orchestrator.workflows import api
 from aria.modeling.service_instance import NodeBase
@@ -113,13 +113,13 @@ def run_operation_on_node(ctx, op_name, interface_name):
         operation_name=op_name,
         operation_kwargs=dict(function='{name}.{func.__name__}'.format(name=__name__, func=func)))
     node.interfaces[interface.name] = interface
-    engine.construct_execution_tasks(
+    task.construct_execution_tasks(
         ctx.execution,
         single_operation_workflow(ctx, node=node, interface_name=interface_name, op_name=op_name),
         ThreadExecutor)
     ctx.execution = ctx.execution
 
-    eng = engine.Engine(default_executor=ThreadExecutor())
+    eng = engine.Engine(executors=ThreadExecutor())
     eng.execute(ctx)
     return node
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
index 569e8be..faac35f 100644
--- a/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
+++ b/tests/orchestrator/workflows/core/test_task_graph_into_execution_graph.py
@@ -21,6 +21,7 @@ from aria.orchestrator.workflows import (
     api,
     core
 )
+from aria.orchestrator.workflows.core import task
 from aria.orchestrator.workflows.executor import base
 from tests import mock
 from tests import storage
@@ -70,7 +71,7 @@ def test_task_graph_into_execution_graph(tmpdir):
     # Direct check
     execution = workflow_context.model.execution.list()[0]
 
-    core.engine.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
+    task.construct_execution_tasks(execution, test_task_graph, base.StubTaskExecutor)
     workflow_context.execution = execution
 
     execution_tasks = topological_sort(workflow_context._graph)
@@ -87,23 +88,23 @@ def test_task_graph_into_execution_graph(tmpdir):
         '{0}-End'.format(test_task_graph.id)
     ]
 
-    assert expected_tasks_names == [t.api_id for t in execution_tasks]
+    assert expected_tasks_names == [t._api_id for t in execution_tasks]
     assert all(isinstance(task, models.Task) for task in execution_tasks)
     execution_tasks = iter(execution_tasks)
 
-    assert next(execution_tasks).stub_type == models.Task.START_WORKFLOW
+    assert next(execution_tasks)._stub_type == models.Task.START_WORKFLOW
     _assert_execution_is_api_task(next(execution_tasks), simple_before_task)
-    assert next(execution_tasks).stub_type == models.Task.START_SUBWROFKLOW
+    assert next(execution_tasks)._stub_type == models.Task.START_SUBWROFKLOW
     _assert_execution_is_api_task(next(execution_tasks), inner_task)
-    assert next(execution_tasks).stub_type == models.Task.END_SUBWORKFLOW
+    assert next(execution_tasks)._stub_type == models.Task.END_SUBWORKFLOW
     _assert_execution_is_api_task(next(execution_tasks), simple_after_task)
-    assert next(execution_tasks).stub_type == models.Task.END_WORKFLOW
+    assert next(execution_tasks)._stub_type == models.Task.END_WORKFLOW
 
     storage.release_sqlite_storage(workflow_context.model)
 
 
 def _assert_execution_is_api_task(execution_task, api_task):
-    assert execution_task.api_id == api_task.id
+    assert execution_task._api_id == api_task.id
     assert execution_task.name == api_task.name
     assert execution_task.function == api_task.function
     assert execution_task.actor == api_task.actor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 97b24f3..83584a6 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -53,7 +53,7 @@ class MockContext(object):
 
     @property
     @contextmanager
-    def track_changes(self):
+    def persist_changes(self):
         yield
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 8ede925..4566ec3 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -17,7 +17,7 @@ import pytest
 
 from aria import extension
 from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator import workflow, operation
 
@@ -57,7 +57,7 @@ def test_decorate_extension(context, executor):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    engine.construct_execution_tasks(context.execution, graph, executor.__class__)
+    task.construct_execution_tasks(context.execution, graph, executor.__class__)
     context.execution = context.execution
     eng = engine.Engine(executor)
     eng.execute(context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/7762a492/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
index 9eb8916..0c15c8a 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -18,7 +18,7 @@ import copy
 import pytest
 
 from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.core import engine, task
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator import workflow, operation
 from aria.orchestrator.workflows import exceptions
@@ -107,7 +107,7 @@ def _run_workflow(context, executor, op_func, arguments=None):
         graph.add_tasks(task)
         return graph
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
-    engine.construct_execution_tasks(context.execution, graph, executor.__class__)
+    task.construct_execution_tasks(context.execution, graph, executor.__class__)
     context.execution = context.execution
     eng = engine.Engine(executor)
     eng.execute(context)



Mime
View raw message