ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject incubator-ariatosca git commit: ARIA-74 Add process executor extension registration hooks
Date Mon, 23 Jan 2017 15:30:39 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-74-process-executor-hook [created] fdbed5359


ARIA-74 Add process executor extension registration hooks


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/fdbed535
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/fdbed535
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/fdbed535

Branch: refs/heads/ARIA-74-process-executor-hook
Commit: fdbed53598e6692ac6d3da3b3515749ad08b1124
Parents: eaf9974
Author: Dan Kilman <dank@gigaspaces.com>
Authored: Mon Jan 23 16:55:41 2017 +0200
Committer: Dan Kilman <dank@gigaspaces.com>
Committed: Mon Jan 23 17:26:07 2017 +0200

----------------------------------------------------------------------
 aria/extension.py                               | 15 ++++
 aria/orchestrator/workflows/executor/process.py |  5 ++
 .../executor/test_process_executor_extension.py | 80 ++++++++++++++++++++
 3 files changed, 100 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdbed535/aria/extension.py
----------------------------------------------------------------------
diff --git a/aria/extension.py b/aria/extension.py
index ddb7c25..6e40038 100644
--- a/aria/extension.py
+++ b/aria/extension.py
@@ -118,8 +118,23 @@ class _ParserExtensionRegistration(_ExtensionRegistration):
 parser = _ParserExtensionRegistration()
 
 
+class _ProcessExecutorExtensionRegistration(_ExtensionRegistration):
+    """Process executor extension class decorator"""
+
+    @_registrar
+    def decorate(self):
+        """
+        The operation function executed by the process executor will be decorated with the
function
+        returned from decorate().
+        """
+        return []
+
+executor = _ProcessExecutorExtensionRegistration()
+
+
 def init():
     """
     Initialize all registrars by calling all registered functions
     """
     parser.init()
+    executor.init()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdbed535/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 2cc9178..770a060 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -39,6 +39,8 @@ import Queue
 
 import jsonpickle
 
+import aria
+from aria.extension import executor
 from aria.utils import imports
 from aria.utils import exceptions
 from aria.orchestrator.workflows.executor import base
@@ -291,6 +293,9 @@ def _main():
         try:
             ctx = serialization.operation_context_from_dict(context_dict)
             task_func = imports.load_attribute(operation_mapping)
+            aria.install_aria_extensions()
+            for decorate in executor.decorate():
+                task_func = decorate(task_func)
             task_func(ctx=ctx, **operation_inputs)
             messenger.succeeded(tracked_changes=instrument.tracked_changes)
         except BaseException as e:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/fdbed535/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
new file mode 100644
index 0000000..fd46d14
--- /dev/null
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -0,0 +1,80 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria import extension
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+
+import tests
+from tests import mock
+from tests import storage
+
+
+def test_decorate_extension(context, executor):
+    inputs = {'input1': 1, 'input2': 2}
+
+    def get_node_instance(ctx):
+        return ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+
+    @workflow
+    def mock_workflow(ctx, graph):
+        node_instance = get_node_instance(ctx)
+        op = 'test.op'
+        op_dict = {'operation': '{0}.{1}'.format(__name__, _mock_operation.__name__)}
+        node_instance.node.operations['test.op'] = op_dict
+        task = api.task.OperationTask.node_instance(instance=node_instance, name=op, inputs=inputs)
+        graph.add_tasks(task)
+        return graph
+    graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
+    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+    eng.execute()
+    out = get_node_instance(context).runtime_properties['out']
+    assert out['wrapper_inputs'] == inputs
+    assert out['function_inputs'] == inputs
+
+
+@extension.executor
+class MockProcessExecutorExtension(object):
+
+    def decorate(self):
+        def decorator(function):
+            def wrapper(ctx, **operation_inputs):
+                ctx.node_instance.runtime_properties['out'] = {'wrapper_inputs': operation_inputs}
+                function(ctx=ctx, **operation_inputs)
+            return wrapper
+        return decorator
+
+
+@operation
+def _mock_operation(ctx, **operation_inputs):
+    ctx.node_instance.runtime_properties['out']['function_inputs'] = operation_inputs
+
+
+@pytest.fixture
+def executor():
+    result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+    yield result
+    result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+    result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir)))
+    yield result
+    storage.release_sqlite_storage(result.model)


Mime
View raw message