ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: wip3
Date Thu, 15 Jun 2017 11:16:47 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-278-Remove-core-tasks 4d9112085 -> 21120aa64


wip3


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/21120aa6
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/21120aa6
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/21120aa6

Branch: refs/heads/ARIA-278-Remove-core-tasks
Commit: 21120aa644ea737d1cd8bab363bcf74df43a3bf5
Parents: 4d91120
Author: max-orlov <maxim@gigaspaces.com>
Authored: Thu Jun 15 14:16:43 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Thu Jun 15 14:16:43 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/context/workflow.py           |   7 +
 aria/orchestrator/workflows/core/engine.py      |   5 +-
 .../workflows/core/events_handler.py            | 135 ++++++++++---------
 aria/orchestrator/workflows/events_logging.py   |  56 +++++---
 aria/orchestrator/workflows/executor/base.py    |   3 +-
 tests/orchestrator/test_workflow_runner.py      |   2 +-
 .../orchestrator/workflows/core/test_engine.py  |   4 +-
 7 files changed, 117 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/21120aa6/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 5404df5..6a7fb1b 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -107,6 +107,13 @@ class WorkflowContext(BaseContext):
 
         return self._execution_graph
 
+    @property
+    @contextmanager
+    def track_execution(self):
+        self._model.execution.update(self.execution)
+        yield
+        self._model.execution.update(self.execution)
+
 
 class _CurrentContext(threading.local):
     """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/21120aa6/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index ade3661..9ac7215 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -110,9 +110,6 @@ class Engine(logger.LoggerMixin):
             yield task
 
     def _handle_executable_task(self, ctx, task):
-        if not task.stub_type:
-            events.sent_task_signal.send(task)
-
         if task._executor not in self._executors:
             self._executors[task._executor] = task._executor()
         executor = self._executors[task._executor]
@@ -129,6 +126,8 @@ class Engine(logger.LoggerMixin):
             name=task.name
         )
 
+        if not task.stub_type:
+            events.sent_task_signal.send(op_ctx)
         executor.execute(op_ctx)
 
     @staticmethod

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/21120aa6/aria/orchestrator/workflows/core/events_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/events_handler.py b/aria/orchestrator/workflows/core/events_handler.py
index 8b217f5..b9d467d 100644
--- a/aria/orchestrator/workflows/core/events_handler.py
+++ b/aria/orchestrator/workflows/core/events_handler.py
@@ -30,104 +30,109 @@ from ... import exceptions
 
 
 @events.sent_task_signal.connect
-def _task_sent(task, *args, **kwargs):
-    task.status = task.SENT
+def _task_sent(ctx, *args, **kwargs):
+    with ctx.track_task:
+        ctx.task.status = ctx.task.SENT
 
 
 @events.start_task_signal.connect
 def _task_started(ctx, *args, **kwargs):
-    ctx.task.started_at = datetime.utcnow()
-    ctx.task.status = ctx.task.STARTED
-    _update_node_state_if_necessary(ctx, is_transitional=True)
+    with ctx.track_task:
+        ctx.task.started_at = datetime.utcnow()
+        ctx.task.status = ctx.task.STARTED
+        _update_node_state_if_necessary(ctx, is_transitional=True)
 
 
 @events.on_failure_task_signal.connect
 def _task_failed(ctx, exception, *args, **kwargs):
-    should_retry = all([
-        not isinstance(exception, exceptions.TaskAbortException),
-        ctx.task.attempts_count < ctx.task.max_attempts or
-        ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
-        # ignore_failure check here means the task will not be retried and it will be marked
-        # as failed. The engine will also look at ignore_failure so it won't fail the
-        # workflow.
-        not ctx.task.ignore_failure
-    ])
-    if should_retry:
-        retry_interval = None
-        if isinstance(exception, exceptions.TaskRetryException):
-            retry_interval = exception.retry_interval
-        if retry_interval is None:
-            retry_interval = ctx.task.retry_interval
-        ctx.task.status = ctx.task.RETRYING
-        ctx.task.attempts_count += 1
-        ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
-    else:
-        ctx.task.ended_at = datetime.utcnow()
-        ctx.task.status = ctx.task.FAILED
+    with ctx.track_task:
+        should_retry = all([
+            not isinstance(exception, exceptions.TaskAbortException),
+            ctx.task.attempts_count < ctx.task.max_attempts or
+            ctx.task.max_attempts == ctx.task.INFINITE_RETRIES,
+            # ignore_failure check here means the task will not be retried and it will be
marked
+            # as failed. The engine will also look at ignore_failure so it won't fail the
+            # workflow.
+            not ctx.task.ignore_failure
+        ])
+        if should_retry:
+            retry_interval = None
+            if isinstance(exception, exceptions.TaskRetryException):
+                retry_interval = exception.retry_interval
+            if retry_interval is None:
+                retry_interval = ctx.task.retry_interval
+            ctx.task.status = ctx.task.RETRYING
+            ctx.task.attempts_count += 1
+            ctx.task.due_at = datetime.utcnow() + timedelta(seconds=retry_interval)
+        else:
+            ctx.task.ended_at = datetime.utcnow()
+            ctx.task.status = ctx.task.FAILED
 
 
 @events.on_success_task_signal.connect
 def _task_succeeded(ctx, *args, **kwargs):
-    ctx.task.ended_at = datetime.utcnow()
-    ctx.task.status = ctx.task.SUCCESS
+    with ctx.track_task:
+        ctx.task.ended_at = datetime.utcnow()
+        ctx.task.status = ctx.task.SUCCESS
 
-    _update_node_state_if_necessary(ctx)
+        _update_node_state_if_necessary(ctx)
 
 
 @events.start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
-    execution = workflow_context.execution
-    # the execution may already be in the process of cancelling
-    if execution.status in (execution.CANCELLING, execution.CANCELLED):
-        return
-    execution.status = execution.STARTED
-    execution.started_at = datetime.utcnow()
-    workflow_context.execution = execution
+    with workflow_context.track_execution:
+        execution = workflow_context.execution
+        # the execution may already be in the process of cancelling
+        if execution.status in (execution.CANCELLING, execution.CANCELLED):
+            return
+        execution.status = execution.STARTED
+        execution.started_at = datetime.utcnow()
 
 
 @events.on_failure_workflow_signal.connect
 def _workflow_failed(workflow_context, exception, *args, **kwargs):
-    execution = workflow_context.execution
-    execution.error = str(exception)
-    execution.status = execution.FAILED
-    execution.ended_at = datetime.utcnow()
-    workflow_context.execution = execution
+    with workflow_context.track_execution:
+        execution = workflow_context.execution
+        execution.error = str(exception)
+        execution.status = execution.FAILED
+        execution.ended_at = datetime.utcnow()
 
 
 @events.on_success_workflow_signal.connect
 def _workflow_succeeded(workflow_context, *args, **kwargs):
-    execution = workflow_context.execution
-    execution.status = execution.SUCCEEDED
-    execution.ended_at = datetime.utcnow()
-    workflow_context.execution = execution
+    with workflow_context.track_execution:
+        execution = workflow_context.execution
+        execution.status = execution.SUCCEEDED
+        execution.ended_at = datetime.utcnow()
 
 
 @events.on_cancelled_workflow_signal.connect
 def _workflow_cancelled(workflow_context, *args, **kwargs):
-    execution = workflow_context.execution
-    # _workflow_cancelling function may have called this function already
-    if execution.status == execution.CANCELLED:
-        return
-    # the execution may have already been finished
-    elif execution.status in (execution.SUCCEEDED, execution.FAILED):
-        _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
-    else:
-        execution.status = execution.CANCELLED
-        execution.ended_at = datetime.utcnow()
-        workflow_context.execution = execution
+    with workflow_context.track_execution:
+        execution = workflow_context.execution
+        # _workflow_cancelling function may have called this function already
+        if execution.status == execution.CANCELLED:
+            return
+        # the execution may have already been finished
+        elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+            _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+        else:
+            execution.status = execution.CANCELLED
+            execution.ended_at = datetime.utcnow()
 
 
 @events.on_cancelling_workflow_signal.connect
 def _workflow_cancelling(workflow_context, *args, **kwargs):
-    execution = workflow_context.execution
-    if execution.status == execution.PENDING:
-        return _workflow_cancelled(workflow_context=workflow_context)
-    # the execution may have already been finished
-    elif execution.status in (execution.SUCCEEDED, execution.FAILED):
-        _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
-    else:
-        execution.status = execution.CANCELLING
-        workflow_context.execution = execution
+    with workflow_context.track_execution:
+        execution = workflow_context.execution
+        if execution.status == execution.PENDING:
+            return _workflow_cancelled(workflow_context=workflow_context)
+        # the execution may have already been finished
+        elif execution.status in (execution.SUCCEEDED, execution.FAILED):
+            _log_tried_to_cancel_execution_but_it_already_ended(workflow_context, execution.status)
+        else:
+            execution.status = execution.CANCELLING
+            workflow_context.execution = execution
 
 
 def _update_node_state_if_necessary(ctx, is_transitional=False):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/21120aa6/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index 4cee867..e45367f 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -35,54 +35,66 @@ def _get_task_name(task):
 
 @events.start_task_signal.connect
 def _start_task_handler(ctx, **kwargs):
-    # If the task has no function this is an empty task.
-    if ctx.task.function:
-        suffix = 'started...'
-        logger = ctx.logger.info
-    else:
-        suffix = 'has no implementation'
-        logger = ctx.logger.debug
+    with ctx.track_task:
+        # If the task has no function this is an empty task.
+        if ctx.task.function:
+            suffix = 'started...'
+            logger = ctx.logger.info
+        else:
+            suffix = 'has no implementation'
+            logger = ctx.logger.debug
 
-    logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
-        name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
+        logger('{name} {task.interface_name}.{task.operation_name} {suffix}'.format(
+            name=_get_task_name(ctx.task), task=ctx.task, suffix=suffix))
 
 
 @events.on_success_task_signal.connect
 def _success_task_handler(ctx, **kwargs):
-    if not ctx.task.function:
-        return
-    ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
-                    .format(name=_get_task_name(ctx.task), task=ctx.task))
+    with ctx.track_task:
+        if not ctx.task.function:
+            return
+        ctx.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
+                        .format(name=_get_task_name(ctx.task), task=ctx.task))
 
 
 @events.on_failure_task_signal.connect
 def _failure_operation_handler(ctx, traceback, **kwargs):
-    ctx.logger.error(
-        '{name} {task.interface_name}.{task.operation_name} failed'
-        .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
-    )
+    with ctx.track_task:
+        ctx.logger.error(
+            '{name} {task.interface_name}.{task.operation_name} failed'
+            .format(name=_get_task_name(ctx.task), task=ctx.task), extra=dict(traceback=traceback)
+        )
 
 
 @events.start_workflow_signal.connect
 def _start_workflow_handler(context, **kwargs):
-    context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
+    with context.track_execution:
+        context.logger.info("Starting '{ctx.workflow_name}' workflow execution".format(ctx=context))
 
 
 @events.on_failure_workflow_signal.connect
 def _failure_workflow_handler(context, **kwargs):
-    context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
+    with context.track_execution:
+        context.logger.info("'{ctx.workflow_name}' workflow execution failed".format(ctx=context))
 
 
 @events.on_success_workflow_signal.connect
 def _success_workflow_handler(context, **kwargs):
-    context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(ctx=context))
+    with context.track_execution:
+        context.logger.info("'{ctx.workflow_name}' workflow execution succeeded".format(
+            ctx=context)
+        )
 
 
 @events.on_cancelled_workflow_signal.connect
 def _cancel_workflow_handler(context, **kwargs):
-    context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
+    with context.track_execution:
+        context.logger.info("'{ctx.workflow_name}' workflow execution canceled".format(ctx=context))
 
 
 @events.on_cancelling_workflow_signal.connect
 def _cancelling_workflow_handler(context, **kwargs):
-    context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(ctx=context))
+    with context.track_execution:
+        context.logger.info("Cancelling '{ctx.workflow_name}' workflow execution".format(
+            ctx=context)
+        )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/21120aa6/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index a93e4d5..a2c9c62 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -51,8 +51,7 @@ class BaseExecutor(logger.LoggerMixin):
 
     @staticmethod
     def _task_started(ctx):
-        with ctx.track_task:
-            events.start_task_signal.send(ctx)
+        events.start_task_signal.send(ctx)
 
     @staticmethod
     def _task_failed(ctx, exception, traceback=None):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/21120aa6/tests/orchestrator/test_workflow_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_workflow_runner.py b/tests/orchestrator/test_workflow_runner.py
index bc4bab0..2ae1e59 100644
--- a/tests/orchestrator/test_workflow_runner.py
+++ b/tests/orchestrator/test_workflow_runner.py
@@ -148,7 +148,7 @@ def test_cancel_execution(request):
     with mock.patch('aria.orchestrator.workflow_runner.Engine', return_value=mock_engine):
         workflow_runner = _create_workflow_runner(request, mock_workflow)
         workflow_runner.cancel()
-        mock_engine.cancel_execution.assert_called_once_with()
+        mock_engine.cancel_execution.assert_called_once_with(ctx=workflow_runner._workflow_context)
 
 
 def test_execution_model_creation(request, service, model):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/21120aa6/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 2fbf4a9..64abaa9 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -101,8 +101,8 @@ class BaseTest(object):
 
     @pytest.fixture(autouse=True)
     def signals_registration(self, ):
-        def sent_task_handler(task, *args, **kwargs):
-            if task.stub_type is None:
+        def sent_task_handler(ctx, *args, **kwargs):
+            if ctx.task.stub_type is None:
                 calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
                 global_test_holder['sent_task_signal_calls'] = calls + 1
 


Mime
View raw message