ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [1/2] incubator-ariatosca git commit: wip
Date Sun, 30 Apr 2017 16:54:24 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/stub_task_branch [created] 222bef595


wip


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/282fcbf9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/282fcbf9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/282fcbf9

Branch: refs/heads/stub_task_branch
Commit: 282fcbf9b51e05fe1d1fef2a0835e9c4654dea3f
Parents: 1f3e7ff
Author: max-orlov <maxim@gigaspaces.com>
Authored: Sun Apr 30 16:05:27 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Sun Apr 30 16:05:27 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/events.py                     |  8 ++++
 aria/orchestrator/workflows/core/engine.py      |  6 ++-
 aria/orchestrator/workflows/core/task.py        | 15 +++++++-
 aria/orchestrator/workflows/core/translation.py |  4 +-
 aria/orchestrator/workflows/events_logging.py   | 12 ++++--
 aria/orchestrator/workflows/executor/base.py    |  4 ++
 aria/orchestrator/workflows/executor/dry.py     | 39 +++++++++++++-------
 tests/end2end/test_hello_world.py               |  3 +-
 8 files changed, 69 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/events.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events.py b/aria/orchestrator/events.py
index a1c4922..812040b 100644
--- a/aria/orchestrator/events.py
+++ b/aria/orchestrator/events.py
@@ -28,6 +28,14 @@ start_task_signal = signal('start_task_signal')
 on_success_task_signal = signal('success_task_signal')
 on_failure_task_signal = signal('failure_task_signal')
 
+# node state signals:
+# Note that each signal corresponds with a task. The basic start_task_signal also changes
the state
+# of the node on which it runs. (so does the on_success_task_signal and the on_failure_task_signal)
+start_node_signal = signal('start_task_signal')
+on_success_node_signal = signal('success_task_signal')
+on_failure_node_signal = signal('failure_task_signal')
+
+
 # workflow engine workflow signals:
 start_workflow_signal = signal('start_workflow_signal')
 on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 155d0ee..97c4999 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -44,7 +44,8 @@ class Engine(logger.LoggerMixin):
         self._execution_graph = networkx.DiGraph()
         self._executor = executor
         translation.build_execution_graph(task_graph=tasks_graph,
-                                          execution_graph=self._execution_graph)
+                                          execution_graph=self._execution_graph,
+                                          executor=self._executor)
 
     def execute(self):
         """
@@ -110,11 +111,12 @@ class Engine(logger.LoggerMixin):
             yield task
 
     def _handle_executable_task(self, task):
+        # import pydevd; pydevd.settrace('localhost', suspend=False)
         if isinstance(task, engine_task.StubTask):
             task.status = models.Task.SUCCESS
         else:
             events.sent_task_signal.send(task)
-            self._executor.execute(task)
+            task.execute()
 
     def _handle_ended_tasks(self, task):
         if task.status == models.Task.FAILED and not task.ignore_failure:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 2b26152..548bf47 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -28,6 +28,7 @@ from functools import (
 from ....modeling import models
 from ...context import operation as operation_context
 from .. import exceptions
+from ..executor import dry
 
 
 def _locked(func=None):
@@ -47,9 +48,13 @@ class BaseTask(object):
     Base class for Task objects
     """
 
-    def __init__(self, id, *args, **kwargs):
+    def __init__(self, id, executor, *args, **kwargs):
         super(BaseTask, self).__init__(*args, **kwargs)
         self._id = id
+        self._executor = executor
+
+    def execute(self):
+        self._executor.execute(self)
 
     @property
     def id(self):
@@ -65,7 +70,7 @@ class StubTask(BaseTask):
     """
 
     def __init__(self, *args, **kwargs):
-        super(StubTask, self).__init__(*args, **kwargs)
+        super(StubTask, self).__init__(executor=dry.StubExecutor(), *args, **kwargs)
         self.status = models.Task.PENDING
         self.due_at = datetime.utcnow()
 
@@ -128,6 +133,12 @@ class OperationTask(BaseTask):
             raise RuntimeError('No operation context could be created for {actor.model_cls}'
                                .format(actor=api_task.actor))
 
+        # TODO: this executor should be put into the task (if no executor was setup in the
+        # operation)
+        # executor = '{module}.{name}'.format(module=self._executor.__module__,
+        #                                     name=self._executor.__class__.__name__
+        #                                    )
+
         task_model = create_task_model(
             name=api_task.name,
             implementation=api_task.implementation,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/core/translation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py
index b6cbdad..1ae59a3 100644
--- a/aria/orchestrator/workflows/core/translation.py
+++ b/aria/orchestrator/workflows/core/translation.py
@@ -24,6 +24,7 @@ from . import task as core_task
 def build_execution_graph(
         task_graph,
         execution_graph,
+        executor,
         start_cls=core_task.StartWorkflowTask,
         end_cls=core_task.EndWorkflowTask,
         depends_on=()):
@@ -49,13 +50,14 @@ def build_execution_graph(
 
         if isinstance(api_task, api.task.OperationTask):
             # Add the task an the dependencies
-            operation_task = core_task.OperationTask(api_task)
+            operation_task = core_task.OperationTask(api_task, executor=executor)
             _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies)
         elif isinstance(api_task, api.task.WorkflowTask):
             # Build the graph recursively while adding start and end markers
             build_execution_graph(
                 task_graph=api_task,
                 execution_graph=execution_graph,
+                executor=executor,
                 start_cls=core_task.StartSubWorkflowTask,
                 end_cls=core_task.EndSubWorkflowTask,
                 depends_on=operation_dependencies

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index 3ffe18b..fa993d0 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -34,19 +34,25 @@ def _get_task_name(task):
 
 
 @events.start_task_signal.connect
-def _start_task_handler(task, **kwargs):
+def _start_task_handler(task, skip_logging=False, **kwargs):
+    if skip_logging:
+        return
     task.context.logger.info('{name} {task.interface_name}.{task.operation_name} started...'
                              .format(name=_get_task_name(task), task=task))
 
 
 @events.on_success_task_signal.connect
-def _success_task_handler(task, **kwargs):
+def _success_task_handler(task, skip_logging=False, **kwargs):
+    if skip_logging:
+        return
     task.context.logger.info('{name} {task.interface_name}.{task.operation_name} successful'
                              .format(name=_get_task_name(task), task=task))
 
 
 @events.on_failure_task_signal.connect
-def _failure_operation_handler(task, traceback, **kwargs):
+def _failure_operation_handler(task, traceback, skip_logging=False, **kwargs):
+    if skip_logging:
+        return
     task.context.logger.error(
         '{name} {task.interface_name}.{task.operation_name} failed'
         .format(name=_get_task_name(task), task=task), extra=dict(traceback=traceback)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/executor/base.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py
index 39becef..f11a6b7 100644
--- a/aria/orchestrator/workflows/executor/base.py
+++ b/aria/orchestrator/workflows/executor/base.py
@@ -40,6 +40,10 @@ class BaseExecutor(logger.LoggerMixin):
         pass
 
     @staticmethod
+    def _task_sent(task):
+        events.sent_task_signal.send(task)
+
+    @staticmethod
     def _task_started(task):
         events.start_task_signal.send(task)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
index e1261bb..5be8015 100644
--- a/aria/orchestrator/workflows/executor/dry.py
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -19,21 +19,40 @@ Dry executor
 
 from datetime import datetime
 
+from aria.orchestrator import events
 from .base import BaseExecutor
 
 
-class DryExecutor(BaseExecutor):
+# TODO: the name of this module should definitely change
+
+class StubExecutor(BaseExecutor):
+
+    @staticmethod
+    def _task_sent(*args, **kwargs):
+        pass
+
+    @staticmethod
+    def _task_started(task):
+        events.start_task_signal.send(task, skip_logging=True)
+
+    @staticmethod
+    def _task_succeeded(task):
+        events.on_success_task_signal.send(task, skip_logging=True)
+
+    @staticmethod
+    def _task_failed(*args, **kwargs):
+        pass
+
+    def execute(self, task):
+        pass
+
+
+class DryExecutor(StubExecutor):
     """
     Executor which dry runs tasks - prints task information without causing any side effects
     """
 
     def execute(self, task):
-        # updating the task manually instead of calling self._task_started(task),
-        # to avoid any side effects raising that event might cause
-        with task._update():
-            task.started_at = datetime.utcnow()
-            task.status = task.STARTED
-
         if hasattr(task.actor, 'source_node'):
             name = '{source_node.name}->{target_node.name}'.format(
                 source_node=task.actor.source_node, target_node=task.actor.target_node)
@@ -47,9 +66,3 @@ class DryExecutor(BaseExecutor):
         task.context.logger.info(
             '<dry> {name} {task.interface_name}.{task.operation_name} successful'
             .format(name=name, task=task))
-
-        # updating the task manually instead of calling self._task_succeeded(task),
-        # to avoid any side effects raising that event might cause
-        with task._update():
-            task.ended_at = datetime.utcnow()
-            task.status = task.SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/282fcbf9/tests/end2end/test_hello_world.py
----------------------------------------------------------------------
diff --git a/tests/end2end/test_hello_world.py b/tests/end2end/test_hello_world.py
index 09e5d06..8895593 100644
--- a/tests/end2end/test_hello_world.py
+++ b/tests/end2end/test_hello_world.py
@@ -30,7 +30,8 @@ def test_hello_world(testenv):
         # Even if some assertions failed, attempt to execute uninstall so the
         # webserver process doesn't stay up once the test is finished
         # TODO: remove force_service_delete=True
-        testenv.uninstall_service(force_service_delete=True)
+        pass
+        # testenv.uninstall_service(force_service_delete=True)
 
     _verify_webserver_down('http://localhost:9090')
     testenv.verify_clean_storage()


Mime
View raw message