ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From emblempar...@apache.org
Subject incubator-ariatosca git commit: ARIA-12 New "Runner" class for simple workflow executions
Date Wed, 25 Jan 2017 13:07:24 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/master 4c75aeba3 -> 372f2a150


ARIA-12 New "Runner" class for simple workflow executions


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/372f2a15
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/372f2a15
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/372f2a15

Branch: refs/heads/master
Commit: 372f2a15011800d3503a8688800ce18fc02aa04e
Parents: 4c75aeb
Author: Tal Liron <tal.liron@gmail.com>
Authored: Tue Jan 24 17:57:50 2017 +0200
Committer: Tal Liron <tal.liron@gmail.com>
Committed: Wed Jan 25 15:05:55 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/runner.py       | 135 +++++++++++++++++++++++++++++++++
 tests/mock/__init__.py            |   2 +-
 tests/mock/context.py             |  36 +--------
 tests/mock/topology.py            |  96 +++++++++++++++++++++++
 tests/orchestrator/test_runner.py |  74 ++++++++++++++++++
 5 files changed, 309 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/372f2a15/aria/orchestrator/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py
new file mode 100644
index 0000000..16acc19
--- /dev/null
+++ b/aria/orchestrator/runner.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow runner
+"""
+
+import platform
+import tempfile
+import os
+
+from sqlalchemy import (create_engine, orm) # @UnresolvedImport
+from sqlalchemy.pool import StaticPool # @UnresolvedImport
+
+from .context.workflow import WorkflowContext
+from .workflows.core.engine import Engine
+from .workflows.executor.thread import ThreadExecutor
+from ..storage import model
+from ..storage.sql_mapi import SQLAlchemyModelAPI
+from ..storage.filesystem_rapi import FileSystemResourceAPI
+from .. import (application_model_storage, application_resource_storage)
+
+
+SQLITE_IN_MEMORY = 'sqlite:///:memory:'
+
+
+class Runner(object):
+    """
+    Runs workflows on a deployment. By default uses temporary storage (either on disk or
in memory)
+    but can also be used with existing storage.
+
+    Handles the initialization of the storage engine and provides convenience methods for
+    sub-classes to create tasks.
+
+    :param path: path to Sqlite database file; use '' (the default) to use a temporary file,
+                 and None to use an in-memory database
+    :type path: string
+    """
+
+    def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn,
+                 deployment_id, storage_path='', is_storage_temporary=True):
+        if storage_path == '':
+            # Temporary file storage
+            the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-')
+            os.close(the_file)
+
+        self._storage_path = storage_path
+        self._is_storage_temporary = is_storage_temporary
+
+        workflow_context = self.create_workflow_context(workflow_name, deployment_id,
+                                                        initialize_model_storage_fn)
+
+        tasks_graph = workflow_fn(ctx=workflow_context, **inputs)
+
+        self._engine = Engine(
+            executor=ThreadExecutor(),
+            workflow_context=workflow_context,
+            tasks_graph=tasks_graph)
+
+    def run(self):
+        try:
+            self._engine.execute()
+        finally:
+            self.cleanup()
+
+    def create_workflow_context(self, workflow_name, deployment_id, initialize_model_storage_fn):
+        model_storage = self.create_sqlite_model_storage()
+        initialize_model_storage_fn(model_storage)
+        resource_storage = self.create_fs_resource_storage()
+        return WorkflowContext(
+            name=workflow_name,
+            model_storage=model_storage,
+            resource_storage=resource_storage,
+            deployment_id=deployment_id,
+            workflow_name=self.__class__.__name__,
+            task_max_attempts=1,
+            task_retry_interval=1)
+
+    def create_sqlite_model_storage(self): # pylint: disable=no-self-use
+        self.cleanup()
+
+        # Engine
+        if self._storage_path is None:
+            # In memory
+            # Causes serious threading problems:
+            # https://gehrcke.de/2015/05/in-memory-sqlite-database-and-flask-a-threading-trap/
+            sqlite_engine = create_engine(
+                SQLITE_IN_MEMORY,
+                connect_args={'check_same_thread': False},
+                poolclass=StaticPool)
+        else:
+            path_prefix = '' if 'Windows' in platform.system() else '/'
+            sqlite_engine = create_engine(
+                'sqlite:///%s%s' % (path_prefix, self._storage_path))
+
+        # Models
+        model.DeclarativeBase.metadata.create_all(bind=sqlite_engine) # @UndefinedVariable
+
+        # Session
+        sqlite_session_factory = orm.sessionmaker(bind=sqlite_engine)
+        if self._storage_path is None:
+            sqlite_session = sqlite_session_factory()
+        else:
+            # File-based storage only
+            sqlite_session = orm.scoped_session(session_factory=sqlite_session_factory)
+
+        # Storage
+        sqlite_kwargs = dict(engine=sqlite_engine, session=sqlite_session)
+        return application_model_storage(
+            SQLAlchemyModelAPI,
+            api_kwargs=sqlite_kwargs)
+
+    def create_fs_resource_storage(self, directory='.'): # pylint: disable=no-self-use
+        fs_kwargs = dict(directory=directory)
+        return application_resource_storage(
+            FileSystemResourceAPI,
+            api_kwargs=fs_kwargs)
+
+    def cleanup(self):
+        if self._is_storage_temporary \
+            and (self._storage_path is not None) \
+            and os.path.isfile(self._storage_path):
+            os.remove(self._storage_path)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/372f2a15/tests/mock/__init__.py
----------------------------------------------------------------------
diff --git a/tests/mock/__init__.py b/tests/mock/__init__.py
index 14541d0..9004b4c 100644
--- a/tests/mock/__init__.py
+++ b/tests/mock/__init__.py
@@ -13,4 +13,4 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from . import models, context, operations
+from . import models, context, topology, operations

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/372f2a15/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 5559675..ec4bfb8 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -19,43 +19,13 @@ from aria.storage.filesystem_rapi import FileSystemResourceAPI
 from aria.storage.sql_mapi import SQLAlchemyModelAPI
 
 from . import models
+from .topology import create_simple_topology_two_nodes
 
 
 def simple(mapi_kwargs, resources_dir=None, **kwargs):
     model_storage = aria.application_model_storage(SQLAlchemyModelAPI, api_kwargs=mapi_kwargs)
-    blueprint = models.get_blueprint()
-    model_storage.blueprint.put(blueprint)
-    deployment = models.get_deployment(blueprint)
-    model_storage.deployment.put(deployment)
 
-    #################################################################################
-    # Creating a simple deployment with node -> node as a graph
-
-    dependency_node = models.get_dependency_node(deployment)
-    model_storage.node.put(dependency_node)
-    storage_dependency_node = model_storage.node.get(dependency_node.id)
-
-    dependency_node_instance = models.get_dependency_node_instance(storage_dependency_node)
-    model_storage.node_instance.put(dependency_node_instance)
-    storage_dependency_node_instance = model_storage.node_instance.get(dependency_node_instance.id)
-
-    dependent_node = models.get_dependent_node(deployment)
-    model_storage.node.put(dependent_node)
-    storage_dependent_node = model_storage.node.get(dependent_node.id)
-
-    dependent_node_instance = models.get_dependent_node_instance(storage_dependent_node)
-    model_storage.node_instance.put(dependent_node_instance)
-    storage_dependent_node_instance = model_storage.node_instance.get(dependent_node_instance.id)
-
-    relationship = models.get_relationship(storage_dependent_node, storage_dependency_node)
-    model_storage.relationship.put(relationship)
-    storage_relationship = model_storage.relationship.get(relationship.id)
-    relationship_instance = models.get_relationship_instance(
-        relationship=storage_relationship,
-        target_instance=storage_dependency_node_instance,
-        source_instance=storage_dependent_node_instance
-    )
-    model_storage.relationship_instance.put(relationship_instance)
+    deployment_id = create_simple_topology_two_nodes(model_storage)
 
     # pytest tmpdir
     if resources_dir:
@@ -70,7 +40,7 @@ def simple(mapi_kwargs, resources_dir=None, **kwargs):
         name='simple_context',
         model_storage=model_storage,
         resource_storage=resource_storage,
-        deployment_id=deployment.id,
+        deployment_id=deployment_id,
         workflow_name=models.WORKFLOW_NAME,
         task_max_attempts=models.TASK_MAX_ATTEMPTS,
         task_retry_interval=models.TASK_RETRY_INTERVAL

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/372f2a15/tests/mock/topology.py
----------------------------------------------------------------------
diff --git a/tests/mock/topology.py b/tests/mock/topology.py
new file mode 100644
index 0000000..e219c33
--- /dev/null
+++ b/tests/mock/topology.py
@@ -0,0 +1,96 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from datetime import datetime
+
+from aria.storage import model
+
+from . import models
+
+
+def create_simple_topology_single_node(model_storage, deployment_id, create_operation):
+    now = datetime.utcnow()
+
+    blueprint = model.Blueprint(name='mock-blueprint',
+                                created_at=now,
+                                updated_at=now,
+                                plan={},
+                                main_file_name='mock-file')
+    model_storage.blueprint.put(blueprint)
+
+    deployment = model.Deployment(name='mock-deployment-%d' % deployment_id,
+                                  blueprint_fk=blueprint.id,
+                                  created_at=now,
+                                  updated_at=now)
+    model_storage.deployment.put(deployment)
+
+    node = model.Node(name='mock-node',
+                      type='tosca.nodes.Compute',
+                      operations={
+                          'tosca.interfaces.node.lifecycle.Standard.create': {
+                              'operation': create_operation,
+                              'inputs': {
+                                  'key': 'create',
+                                  'value': True}}},
+                      number_of_instances=1,
+                      planned_number_of_instances=1,
+                      deploy_number_of_instances=1,
+                      min_number_of_instances=1,
+                      max_number_of_instances=1,
+                      deployment_fk=deployment.id)
+    model_storage.node.put(node)
+
+    node_instance = model.NodeInstance(name='mock-node-instance',
+                                       state='',
+                                       node_fk=node.id)
+    model_storage.node_instance.put(node_instance)
+
+
+def create_simple_topology_two_nodes(model_storage):
+    blueprint = models.get_blueprint()
+    model_storage.blueprint.put(blueprint)
+    deployment = models.get_deployment(blueprint)
+    model_storage.deployment.put(deployment)
+
+    #################################################################################
+    # Creating a simple deployment with node -> node as a graph
+
+    dependency_node = models.get_dependency_node(deployment)
+    model_storage.node.put(dependency_node)
+    storage_dependency_node = model_storage.node.get(dependency_node.id)
+
+    dependency_node_instance = models.get_dependency_node_instance(storage_dependency_node)
+    model_storage.node_instance.put(dependency_node_instance)
+    storage_dependency_node_instance = model_storage.node_instance.get(dependency_node_instance.id)
+
+    dependent_node = models.get_dependent_node(deployment)
+    model_storage.node.put(dependent_node)
+    storage_dependent_node = model_storage.node.get(dependent_node.id)
+
+    dependent_node_instance = models.get_dependent_node_instance(storage_dependent_node)
+    model_storage.node_instance.put(dependent_node_instance)
+    storage_dependent_node_instance = model_storage.node_instance.get(dependent_node_instance.id)
+
+    relationship = models.get_relationship(storage_dependent_node, storage_dependency_node)
+    model_storage.relationship.put(relationship)
+    storage_relationship = model_storage.relationship.get(relationship.id)
+    relationship_instance = models.get_relationship_instance(
+        relationship=storage_relationship,
+        target_instance=storage_dependency_node_instance,
+        source_instance=storage_dependent_node_instance
+    )
+    model_storage.relationship_instance.put(relationship_instance)
+
+    return deployment.id

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/372f2a15/tests/orchestrator/test_runner.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/test_runner.py b/tests/orchestrator/test_runner.py
new file mode 100644
index 0000000..1d46e91
--- /dev/null
+++ b/tests/orchestrator/test_runner.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria import workflow
+from aria.orchestrator import operation
+from aria.orchestrator.workflows.api.task import OperationTask
+from aria.orchestrator.runner import Runner
+
+from tests import mock
+
+import pytest
+
+
+OPERATION_RESULTS = {}
+
+
+@operation
+def mock_create_operation(ctx, key, value, **kwargs): # pylint: disable=unused-argument
+    OPERATION_RESULTS[key] = value
+
+
+@pytest.fixture(autouse=True)
+def cleanup():
+    OPERATION_RESULTS.clear()
+
+
+def test_runner_no_tasks():
+    @workflow
+    def workflow_fn(ctx, graph): # pylint: disable=unused-argument
+        pass
+
+    _test_runner(workflow_fn)
+
+
+def test_runner_tasks():
+    @workflow
+    def workflow_fn(ctx, graph):
+        for node_instance in ctx.model.node_instance.iter():
+            graph.add_tasks(
+                OperationTask.node_instance(instance=node_instance,
+                                            name='tosca.interfaces.node.lifecycle.Standard.create'))
+
+    _test_runner(workflow_fn)
+
+    assert OPERATION_RESULTS.get('create') is True
+
+
+def _initialize_model_storage_fn(model_storage):
+    mock.topology.create_simple_topology_single_node(
+        model_storage,
+        1,
+        '%s.%s' % (__name__, mock_create_operation.__name__)
+    )
+
+
+def _test_runner(workflow_fn):
+    runner = Runner(workflow_name='runner workflow',
+                    workflow_fn=workflow_fn,
+                    inputs={},
+                    initialize_model_storage_fn=_initialize_model_storage_fn,
+                    deployment_id=1)
+    runner.run()


Mime
View raw message