ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject incubator-ariatosca git commit: ARIA-14 Implement initial engine tests
Date Tue, 01 Nov 2016 17:46:55 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-14-workflow-engine-tests [created] 5a77c4716


ARIA-14 Implement initial engine tests


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5a77c471
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5a77c471
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5a77c471

Branch: refs/heads/ARIA-14-workflow-engine-tests
Commit: 5a77c47163728246c1189e8cb44b71ade1cbfeda
Parents: c0bf347
Author: Dan Kilman <dankilman@gmail.com>
Authored: Tue Nov 1 16:42:34 2016 +0200
Committer: Dan Kilman <dankilman@gmail.com>
Committed: Tue Nov 1 19:46:02 2016 +0200

----------------------------------------------------------------------
 aria/contexts.py                     |   4 +-
 aria/events/__init__.py              |   1 +
 aria/events/builtin_event_handler.py |  15 ++-
 aria/storage/models.py               |   4 +-
 aria/tools/application.py            |  10 +-
 aria/workflows/core/engine.py        |  17 ++-
 aria/workflows/core/tasks.py         |  33 ++++--
 tests/workflows/test_engine.py       | 179 ++++++++++++++++++++++++++++++
 8 files changed, 239 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a77c471/aria/contexts.py
----------------------------------------------------------------------
diff --git a/aria/contexts.py b/aria/contexts.py
index ae7fc66..fdd26a2 100644
--- a/aria/contexts.py
+++ b/aria/contexts.py
@@ -201,11 +201,11 @@ class OperationContext(LoggerMixin):
         """
         The model operation
         """
-        return self.storage.operation.get(self.id)
+        return self.model.operation.get(self.id)
 
     @operation.setter
     def operation(self, value):
         """
         Store the operation in the model storage
         """
-        self.storage.operation.store(value)
+        self.model.operation.store(value)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a77c471/aria/events/__init__.py
----------------------------------------------------------------------
diff --git a/aria/events/__init__.py b/aria/events/__init__.py
index 6b07213..74f3e22 100644
--- a/aria/events/__init__.py
+++ b/aria/events/__init__.py
@@ -39,6 +39,7 @@ from blinker import signal
 from ..tools.plugin import plugin_installer
 
 # workflow engine task signals:
+sent_task_signal = signal('sent_task_signal')
 start_task_signal = signal('start_task_signal')
 on_success_task_signal = signal('success_task_signal')
 on_failure_task_signal = signal('failure_task_signal')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a77c471/aria/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py
index ec3238f..2dfbd00 100644
--- a/aria/events/builtin_event_handler.py
+++ b/aria/events/builtin_event_handler.py
@@ -27,12 +27,21 @@ from . import (
     start_workflow_signal,
     on_success_workflow_signal,
     on_failure_workflow_signal,
+    sent_task_signal,
     start_task_signal,
     on_success_task_signal,
     on_failure_task_signal,
 )
 
 
+@sent_task_signal.connect
+def _task_sent(task, *args, **kwargs):
+    operation_context = task.context
+    operation = operation_context.operation
+    operation.status = operation.SENT
+    operation_context.operation = operation
+
+
 @start_task_signal.connect
 def _task_started(task, *args, **kwargs):
     operation_context = task.context
@@ -62,7 +71,7 @@ def _task_succeeded(task, *args, **kwargs):
 
 @start_workflow_signal.connect
 def _workflow_started(workflow_context, *args, **kwargs):
-    execution_cls = workflow_context.storage.execution.model_cls
+    execution_cls = workflow_context.model.execution.model_cls
     execution = execution_cls(
         id=workflow_context.execution_id,
         deployment_id=workflow_context.deployment_id,
@@ -80,7 +89,7 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs):
     execution = workflow_context.execution
     execution.error = str(exception)
     execution.status = execution.FAILED
-    execution.ended_at = datetime.utcnow(),
+    execution.ended_at = datetime.utcnow()
     workflow_context.execution = execution
 
 
@@ -88,5 +97,5 @@ def _workflow_failed(workflow_context, exception, *args, **kwargs):
 def _workflow_succeeded(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
     execution.status = execution.TERMINATED
-    execution.ended_at = datetime.utcnow(),
+    execution.ended_at = datetime.utcnow()
     workflow_context.execution = execution

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a77c471/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index 32403ed..eb6b8e8 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -221,11 +221,13 @@ class Operation(Model):
     A Model which represents an operation
     """
     PENDING = 'pending'
+    SENT = 'sent'
     STARTED = 'started'
     SUCCESS = 'success'
     FAILED = 'failed'
     STATES = (
         PENDING,
+        SENT,
         STARTED,
         SUCCESS,
         FAILED,
@@ -233,7 +235,7 @@ class Operation(Model):
     END_STATES = [SUCCESS, FAILED]
 
     id = Field(type=basestring, default=uuid_generator)
-    status = Field(type=basestring, choices=STATES, default=STARTED)
+    status = Field(type=basestring, choices=STATES, default=PENDING)
     execution_id = Field(type=basestring)
     eta = Field(type=datetime, default=0)
     started_at = Field(type=datetime, default=None)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a77c471/aria/tools/application.py
----------------------------------------------------------------------
diff --git a/aria/tools/application.py b/aria/tools/application.py
index 32feeff..ddc1317 100644
--- a/aria/tools/application.py
+++ b/aria/tools/application.py
@@ -85,11 +85,11 @@ class StorageManager(LoggerMixin):
         Create a StorageManager from a blueprint
         """
         return cls(
-            model_storage,
-            resource_storage,
-            blueprint_path,
-            blueprint_plan,
-            blueprint_id,
+            model_storage=model_storage,
+            resource_storage=resource_storage,
+            blueprint_path=blueprint_path,
+            blueprint_plan=blueprint_plan,
+            blueprint_id=blueprint_id,
             deployment_id=None,
             deployment_plan=None)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a77c471/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index 9c6eff8..5853c61 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -22,8 +22,10 @@ import time
 import networkx
 
 from aria import events, logger
+from aria.storage import models
 
 from . import translation
+from . import tasks
 
 
 class Engine(logger.LoggerMixin):
@@ -64,15 +66,16 @@ class Engine(logger.LoggerMixin):
     def _executable_tasks(self):
         now = time.time()
         return (task for task in self._tasks_iter()
-                if task.status == task.PENDING and
+                if task.status == models.Operation.PENDING and
                 task.eta <= now and
                 not self._task_has_dependencies(task))
 
     def _ended_tasks(self):
-        return (task for task in self._tasks_iter() if task.status in task.END_STATES)
+        return (task for task in self._tasks_iter()
+                if task.status in models.Operation.END_STATES)
 
     def _task_has_dependencies(self, task):
-        return len(self._execution_graph.succ.get(task.id, {})) > 0
+        return len(self._execution_graph.pred.get(task.id, {})) > 0
 
     def _all_tasks_consumed(self):
         return len(self._execution_graph.node) == 0
@@ -81,10 +84,14 @@ class Engine(logger.LoggerMixin):
         return (data['task'] for _, data in self._execution_graph.nodes_iter(data=True))
 
     def _handle_executable_task(self, task):
-        self._executor.execute(task)
+        if isinstance(task, tasks.BaseWorkflowTask):
+            task.status = models.Operation.SUCCESS
+        else:
+            events.sent_task_signal.send(task)
+            self._executor.execute(task)
 
     def _handle_ended_tasks(self, task):
-        if task.status == task.FAILED:
+        if task.status == models.Operation.FAILED:
             raise RuntimeError('Workflow failed')
         else:
             self._execution_graph.remove_node(task.id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a77c471/aria/workflows/core/tasks.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/tasks.py b/aria/workflows/core/tasks.py
index 76ae609..625802a 100644
--- a/aria/workflows/core/tasks.py
+++ b/aria/workflows/core/tasks.py
@@ -17,13 +17,19 @@
 Workflow tasks
 """
 
+import time
 
-class BaseTask(object):
+from aria import logger
+from aria.storage import models
+
+
+class BaseTask(logger.LoggerMixin):
     """
     Base class for Task objects
     """
 
-    def __init__(self, id, name, context):
+    def __init__(self, id, name, context, *args, **kwargs):
+        super(BaseTask, self).__init__(*args, **kwargs)
         self._id = id
         self._name = name
         self._context = context
@@ -50,28 +56,39 @@ class BaseTask(object):
         return self._context
 
 
-class StartWorkflowTask(BaseTask):
+class BaseWorkflowTask(BaseTask):
+    """
+    Base class for all workflow wrapping tasks
+    """
+
+    def __init__(self, *args, **kwargs):
+        super(BaseWorkflowTask, self).__init__(*args, **kwargs)
+        self.status = models.Operation.PENDING
+        self.eta = time.time()
+
+
+class StartWorkflowTask(BaseWorkflowTask):
     """
     Tasks marking a workflow start
     """
     pass
 
 
-class EndWorkflowTask(BaseTask):
+class EndWorkflowTask(BaseWorkflowTask):
     """
     Tasks marking a workflow end
     """
     pass
 
 
-class StartSubWorkflowTask(BaseTask):
+class StartSubWorkflowTask(BaseWorkflowTask):
     """
     Tasks marking a subworkflow start
     """
     pass
 
 
-class EndSubWorkflowTask(BaseTask):
+class EndSubWorkflowTask(BaseWorkflowTask):
     """
     Tasks marking a subworkflow end
     """
@@ -88,7 +105,7 @@ class OperationTask(BaseTask):
         self._create_operation_in_storage()
 
     def _create_operation_in_storage(self):
-        operation_cls = self.context.storage.operation.model_cls
+        operation_cls = self.context.model.operation.model_cls
         operation = operation_cls(
             id=self.context.id,
             execution_id=self.context.execution_id,
@@ -99,6 +116,6 @@ class OperationTask(BaseTask):
 
     def __getattr__(self, attr):
         try:
-            return getattr(self.context, attr)
+            return getattr(self.context.operation, attr)
         except AttributeError:
             return super(OperationTask, self).__getattribute__(attr)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a77c471/tests/workflows/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py
new file mode 100644
index 0000000..6b37e7f
--- /dev/null
+++ b/tests/workflows/test_engine.py
@@ -0,0 +1,179 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+import pytest
+
+import aria
+from aria import events
+from aria import workflow
+from aria import contexts
+from aria.tools import application
+from aria.storage import drivers
+from aria.workflows.executor import thread
+from aria.workflows.core import engine
+
+logging.basicConfig()
+
+
+class TestEngine(object):
+
+    def test_empty_graph_execution(self, workflow_context):
+        self.executor = thread.ThreadExecutor()
+
+        @workflow
+        def mock_workflow(context, graph):
+            return graph
+
+        graph = mock_workflow(context=workflow_context)
+        eng = engine.Engine(executor=self.executor,
+                            workflow_context=workflow_context,
+                            tasks_graph=graph)
+        eng.execute()
+        assert workflow_context.states == ['start', 'success']
+
+    def test_single_task_successful_execution(self, workflow_context):
+        self.executor = thread.ThreadExecutor()
+
+        @workflow
+        def mock_workflow(context, graph):
+            graph.add_task(context.operation(
+                name='failing',
+                operation_details={'operation': 'tests.workflows.test_engine.{name}'.format(
+                    name=mock_success_task.__name__)},
+                node_instance=None
+            ))
+            return graph
+
+        graph = mock_workflow(context=workflow_context)
+        eng = engine.Engine(executor=self.executor,
+                            workflow_context=workflow_context,
+                            tasks_graph=graph)
+        execution_tasks = [data['task'] for _, data in eng._execution_graph.nodes_iter(data=True)]
+        eng.execute()
+        assert workflow_context.states == ['start', 'success']
+        assert any(hasattr(t, 'sent') and t.sent is True for t in execution_tasks)
+
+    def test_single_task_failed_execution(self, workflow_context):
+        self.executor = thread.ThreadExecutor()
+
+        @workflow
+        def mock_workflow(context, graph):
+            graph.add_task(context.operation(
+                name='failing',
+                operation_details={'operation': 'tests.workflows.test_engine.{name}'.format(
+                    name=mock_failed_task.__name__)},
+                node_instance=None
+            ))
+            return graph
+
+        graph = mock_workflow(context=workflow_context)
+        eng = engine.Engine(executor=self.executor,
+                            workflow_context=workflow_context,
+                            tasks_graph=graph)
+        execution_tasks = [data['task'] for _, data in eng._execution_graph.nodes_iter(data=True)]
+        with pytest.raises(RuntimeError):
+            eng.execute()
+        assert workflow_context.states == ['start', 'failure']
+        assert isinstance(workflow_context.exception, RuntimeError)
+        assert any(hasattr(t, 'sent') and t.sent is True for t in execution_tasks)
+
+    def setup_method(self):
+        events.start_workflow_signal.connect(start_workflow_handler)
+        events.on_success_workflow_signal.connect(success_workflow_handler)
+        events.on_failure_workflow_signal.connect(failure_workflow_handler)
+        events.sent_task_signal.connect(sent_task_handler)
+
+    def teardown_method(self):
+        events.start_workflow_signal.disconnect(start_workflow_handler)
+        events.on_success_workflow_signal.disconnect(success_workflow_handler)
+        events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
+        events.sent_task_signal.disconnect(sent_task_handler)
+        if self.executor:
+            self.executor.close()
+
+
+def sent_task_handler(task, *args, **kwargs):
+    task.sent = True
+
+
+def start_workflow_handler(workflow_context, *args, **kwargs):
+    workflow_context.states.append('start')
+
+
+def success_workflow_handler(workflow_context, *args, **kwargs):
+    workflow_context.states.append('success')
+
+
+def failure_workflow_handler(workflow_context, exception, *args, **kwargs):
+    workflow_context.states.append('failure')
+    workflow_context.exception = exception
+
+
+def mock_success_task():
+    pass
+
+
+def mock_failed_task():
+    raise RuntimeError
+
+
+@pytest.fixture(scope='function')
+def workflow_context(tmpdir):
+    from dsl_parser.parser import parse_from_path
+    from dsl_parser.tasks import prepare_deployment_plan
+    blueprint = 'tosca_definitions_version: cloudify_dsl_1_3\nnode_templates: {}'
+    blueprint_dir = tmpdir.mkdir('blueprint')
+    blueprint_path = blueprint_dir.join('blueprint.yaml')
+    blueprint_path.write(blueprint)
+    blueprint_plan = parse_from_path(str(blueprint_path))
+    blueprint_id = 'b1'
+    deployment_plan = prepare_deployment_plan(blueprint_plan.copy())
+    deployment_id = 'd1'
+    work_dir = tmpdir.mkdir('work')
+    storage_dir = work_dir.mkdir('storage')
+    model_storage_dir = storage_dir.mkdir('model')
+    resource_storage_dir = storage_dir.mkdir('resource')
+    model_storage = aria.application_model_storage(
+        drivers.FileSystemModelDriver(str(model_storage_dir)))
+    resource_storage = aria.application_resource_storage(
+        drivers.FileSystemResourceDriver(str(resource_storage_dir)))
+    resource_storage.setup()
+    model_storage.setup()
+    storage_manager = application.StorageManager(
+        model_storage=model_storage,
+        resource_storage=resource_storage,
+        blueprint_path=blueprint_path,
+        blueprint_id=blueprint_id,
+        blueprint_plan=blueprint_plan,
+        deployment_id=deployment_id,
+        deployment_plan=deployment_plan
+    )
+    storage_manager.create_blueprint_storage(
+        source=str(blueprint_path),
+        main_file_name='blueprint.yaml')
+    storage_manager.create_nodes_storage()
+    storage_manager.create_deployment_storage()
+    storage_manager.create_node_instances_storage()
+    result = contexts.WorkflowContext(
+        name='test',
+        model_storage=model_storage,
+        resource_storage=resource_storage,
+        deployment_id=deployment_id,
+        workflow_id='name')
+    result.states = []
+    result.exception = None
+    return result


Mime
View raw message