ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [1/3] incubator-ariatosca git commit: ARIA-4 Create an API for creating workflows ARIA-5-Adapt-workflow-API-users-to-modified-API [Forced Update!]
Date Mon, 07 Nov 2016 15:55:50 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-3-api-for-creating-workflows 2ea0541a8 -> 39db8fc9f (forced update)


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/builtin/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/builtin/__init__.py b/tests/workflows/builtin/__init__.py
new file mode 100644
index 0000000..c82c024
--- /dev/null
+++ b/tests/workflows/builtin/__init__.py
@@ -0,0 +1,86 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+from ... import mock
+
+def assert_node_install_operations(operations, with_relationships=False):
+    if with_relationships:
+        all_operations = [
+            'aria.interfaces.lifecycle.create',
+            'aria.interfaces.relationship_lifecycle.preconfigure',
+            'aria.interfaces.relationship_lifecycle.preconfigure',
+            'aria.interfaces.lifecycle.configure',
+            'aria.interfaces.relationship_lifecycle.postconfigure',
+            'aria.interfaces.relationship_lifecycle.postconfigure',
+            'aria.interfaces.lifecycle.start',
+            'aria.interfaces.relationship_lifecycle.establish',
+            'aria.interfaces.relationship_lifecycle.establish',
+        ]
+
+        for i, operation in enumerate(operations):
+            assert operation.name.startswith(all_operations[i])
+    else:
+        for i, operation in enumerate(operations):
+            assert operation.name.startswith(mock.operations.NODE_OPERATIONS_INSTALL[i])
+
+
+def assert_node_uninstall_operations(operations, with_relationships=False):
+    if with_relationships:
+        all_operations = [
+            'aria.interfaces.lifecycle.stop',
+            'aria.interfaces.relationship_lifecycle.unlink',
+            'aria.interfaces.relationship_lifecycle.unlink',
+            'aria.interfaces.lifecycle.delete',
+        ]
+
+        for i, operation in enumerate(operations):
+            assert operation.name.startswith(all_operations[i])
+    else:
+        for i, operation in enumerate(operations):
+            assert operation.name.startswith(mock.operations.NODE_OPERATIONS_UNINSTALL[i])
+
+
+def ctx_with_basic_graph():
+    """
+    Create the following graph in storage:
+    dependency_node <------ dependent_node
+    :return:
+    """
+    simple_context = mock.context.simple()
+    dependency_node = mock.models.get_dependency_node()
+    dependency_node_instance = mock.models.get_dependency_node_instance(
+        dependency_node=dependency_node)
+
+    relationship = mock.models.get_relationship(dependency_node)
+    relationship_instance = mock.models.get_relationship_instance(
+        relationship=relationship,
+        target_instance=dependency_node_instance
+    )
+
+    dependent_node = mock.models.get_dependent_node(relationship)
+    dependent_node_instance = mock.models.get_dependent_node_instance(
+        dependent_node=dependent_node,
+        relationship_instance=relationship_instance
+    )
+
+    simple_context.model.node.store(dependent_node)
+    simple_context.model.node.store(dependency_node)
+    simple_context.model.node_instance.store(dependent_node_instance)
+    simple_context.model.node_instance.store(dependency_node_instance)
+    simple_context.model.relationship.store(relationship)
+    simple_context.model.relationship_instance.store(relationship_instance)
+
+    return simple_context

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/builtin/test_execute_operation.py
----------------------------------------------------------------------
diff --git a/tests/workflows/builtin/test_execute_operation.py b/tests/workflows/builtin/test_execute_operation.py
new file mode 100644
index 0000000..9409686
--- /dev/null
+++ b/tests/workflows/builtin/test_execute_operation.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.workflows.api import task
+from aria.workflows.builtin.execute_operation import execute_operation
+
+from ... import mock
+from . import ctx_with_basic_graph
+
+
+@pytest.fixture
+def ctx():
+    return ctx_with_basic_graph()
+
+
+def test_execute_operation(ctx):
+    operation_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+    node_instance_id = 'dependency_node_instance'
+
+    execute_tasks = list(
+        task.WorkflowTask(
+            execute_operation,
+            ctx=ctx,
+            operation=operation_name,
+            operation_kwargs={},
+            allow_kwargs_override=False,
+            run_by_dependency_order=False,
+            type_names=[],
+            node_ids=[],
+            node_instance_ids=[node_instance_id]
+        ).topological_order()
+    )
+
+    assert len(execute_tasks) == 1
+    assert execute_tasks[0].name == '{0}.{1}'.format(node_instance_id, operation_name)
+
+# TODO: add more scenarios

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/builtin/test_heal.py
----------------------------------------------------------------------
diff --git a/tests/workflows/builtin/test_heal.py b/tests/workflows/builtin/test_heal.py
new file mode 100644
index 0000000..edffe2c
--- /dev/null
+++ b/tests/workflows/builtin/test_heal.py
@@ -0,0 +1,88 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.workflows.api import task
+from aria.workflows.builtin.heal import heal
+
+from . import (assert_node_install_operations,
+               assert_node_uninstall_operations,
+               ctx_with_basic_graph)
+
+
+@pytest.fixture
+def ctx():
+    return ctx_with_basic_graph()
+
+
+def test_heal_dependent_node(ctx):
+    heal_graph = task.WorkflowTask(heal, ctx=ctx, node_instance_id='dependent_node_instance')
+
+    assert len(list(heal_graph.tasks)) == 2
+    uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True))
+
+    assert len(list(uninstall_subgraph.tasks)) == 2
+    dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = \
+        list(uninstall_subgraph.topological_order(reverse=True))
+
+    assert len(list(install_subgraph.tasks)) == 2
+    dependency_node_subgraph_install, dependent_node_subgraph_install = \
+        list(install_subgraph.topological_order(reverse=True))
+
+    dependent_node_uninstall_tasks = \
+        list(dependent_node_subgraph_uninstall.topological_order(reverse=True))
+    assert isinstance(dependency_node_subgraph_uninstall, task.StubTask)
+    dependent_node_install_tasks = \
+        list(dependent_node_subgraph_install.topological_order(reverse=True))
+    assert isinstance(dependency_node_subgraph_install, task.StubTask)
+
+    assert_node_uninstall_operations(dependent_node_uninstall_tasks, with_relationships=True)
+    assert_node_install_operations(dependent_node_install_tasks, with_relationships=True)
+
+
+def test_heal_dependency_node(ctx):
+    heal_graph = task.WorkflowTask(heal, ctx=ctx, node_instance_id='dependency_node_instance')
+    # both subgraphs should contain un\install for both the dependent and the dependency
+    assert len(list(heal_graph.tasks)) == 2
+    uninstall_subgraph, install_subgraph = list(heal_graph.topological_order(reverse=True))
+
+    uninstall_tasks = list(uninstall_subgraph.topological_order(reverse=True))
+    assert len(uninstall_tasks) == 4
+    unlink_source, unlink_target = uninstall_tasks[:2]
+    dependent_node_subgraph_uninstall, dependency_node_subgraph_uninstall = uninstall_tasks[2:]
+
+    install_tasks = list(install_subgraph.topological_order(reverse=True))
+    assert len(install_tasks) == 4
+    dependency_node_subgraph_install, dependent_node_subgraph_install = install_tasks[:2]
+    establish_source, establish_target = install_tasks[2:]
+
+    assert isinstance(dependent_node_subgraph_uninstall, task.StubTask)
+    dependency_node_uninstall_tasks = \
+        list(dependency_node_subgraph_uninstall.topological_order(reverse=True))
+    assert isinstance(dependent_node_subgraph_install, task.StubTask)
+    dependency_node_install_tasks = \
+        list(dependency_node_subgraph_install.topological_order(reverse=True))
+
+    assert unlink_source.name.startswith('aria.interfaces.relationship_lifecycle.unlink')
+    assert unlink_target.name.startswith('aria.interfaces.relationship_lifecycle.unlink')
+    assert_node_uninstall_operations(dependency_node_uninstall_tasks)
+
+    assert_node_install_operations(dependency_node_install_tasks)
+    assert establish_source.name.startswith('aria.interfaces.relationship_lifecycle.establish')
+    assert establish_target.name.startswith('aria.interfaces.relationship_lifecycle.establish')
+
+
+# TODO: add tests for contained in scenario

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/builtin/test_install.py
----------------------------------------------------------------------
diff --git a/tests/workflows/builtin/test_install.py b/tests/workflows/builtin/test_install.py
new file mode 100644
index 0000000..29212e3
--- /dev/null
+++ b/tests/workflows/builtin/test_install.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.workflows.builtin.install import install
+from aria.workflows.api import task
+
+from . import (assert_node_install_operations,
+               ctx_with_basic_graph)
+
+
+@pytest.fixture
+def ctx():
+    return ctx_with_basic_graph()
+
+
+def test_install(ctx):
+    install_tasks = list(task.WorkflowTask(install, ctx=ctx).topological_order(True))
+
+    assert len(install_tasks) == 2
+    dependency_node_subgraph, dependent_node_subgraph = install_tasks
+    dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True))
+    dependency_node_tasks = list(dependency_node_subgraph.topological_order(reverse=True))
+
+    assert_node_install_operations(dependency_node_tasks)
+    assert_node_install_operations(dependent_node_tasks, with_relationships=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/builtin/test_uninstall.py
----------------------------------------------------------------------
diff --git a/tests/workflows/builtin/test_uninstall.py b/tests/workflows/builtin/test_uninstall.py
new file mode 100644
index 0000000..2d00673
--- /dev/null
+++ b/tests/workflows/builtin/test_uninstall.py
@@ -0,0 +1,39 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.workflows.api import task
+from aria.workflows.builtin.uninstall import uninstall
+
+from . import (assert_node_uninstall_operations,
+               ctx_with_basic_graph)
+
+
+@pytest.fixture
+def ctx():
+    return ctx_with_basic_graph()
+
+
+def test_uninstall(ctx):
+    uninstall_tasks = list(task.WorkflowTask(uninstall, ctx=ctx).topological_order(True))
+
+    assert len(uninstall_tasks) == 2
+    dependent_node_subgraph, dependency_node_subgraph = uninstall_tasks
+    dependent_node_tasks = list(dependent_node_subgraph.topological_order(reverse=True))
+    dependency_node_tasks = list(dependency_node_subgraph.topological_order(reverse=True))
+
+    assert_node_uninstall_operations(operations=dependency_node_tasks)
+    assert_node_uninstall_operations(operations=dependent_node_tasks, with_relationships=True)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/core/__init__.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/__init__.py b/tests/workflows/core/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/tests/workflows/core/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/core/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_executor.py b/tests/workflows/core/test_executor.py
new file mode 100644
index 0000000..8ec0303
--- /dev/null
+++ b/tests/workflows/core/test_executor.py
@@ -0,0 +1,136 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import uuid
+from contextlib import contextmanager
+
+import pytest
+import retrying
+
+from aria import events
+from aria.storage import models
+from aria.workflows.executor import (
+    thread,
+    multiprocess,
+    blocking,
+    # celery
+)
+
+try:
+    import celery as _celery
+    app = _celery.Celery()
+    app.conf.update(CELERY_RESULT_BACKEND='amqp://')
+except ImportError:
+    _celery = None
+    app = None
+
+
+class TestExecutor(object):
+
+    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
+        (thread.ThreadExecutor, {'pool_size': 1}),
+        (thread.ThreadExecutor, {'pool_size': 2}),
+        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
+        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
+        (blocking.CurrentThreadBlockingExecutor, {}),
+        # (celery.CeleryExecutor, {'app': app})
+    ])
+    def test_execute(self, executor_cls, executor_kwargs):
+        self.executor = executor_cls(**executor_kwargs)
+        expected_value = 'value'
+        successful_task = MockTask(mock_successful_task)
+        failing_task = MockTask(mock_failing_task)
+        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
+
+        for task in [successful_task, failing_task, task_with_inputs]:
+            self.executor.execute(task)
+
+        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
+        def assertion():
+            assert successful_task.states == ['start', 'success']
+            assert failing_task.states == ['start', 'failure']
+            assert task_with_inputs.states == ['start', 'failure']
+            assert isinstance(failing_task.exception, MockException)
+            assert isinstance(task_with_inputs.exception, MockException)
+            assert task_with_inputs.exception.message == expected_value
+        assertion()
+
+    def setup_method(self):
+        events.start_task_signal.connect(start_handler)
+        events.on_success_task_signal.connect(success_handler)
+        events.on_failure_task_signal.connect(failure_handler)
+
+    def teardown_method(self):
+        events.start_task_signal.disconnect(start_handler)
+        events.on_success_task_signal.disconnect(success_handler)
+        events.on_failure_task_signal.disconnect(failure_handler)
+        if hasattr(self, 'executor'):
+            self.executor.close()
+
+
+def mock_successful_task():
+    pass
+
+
+def mock_failing_task():
+    raise MockException
+
+
+def mock_task_with_input(input):
+    raise MockException(input)
+
+if app:
+    mock_successful_task = app.task(mock_successful_task)
+    mock_failing_task = app.task(mock_failing_task)
+    mock_task_with_input = app.task(mock_task_with_input)
+
+
+class MockException(Exception):
+    pass
+
+
+class MockTask(object):
+
+    def __init__(self, func, inputs=None):
+        self.states = []
+        self.exception = None
+        self.id = str(uuid.uuid4())
+        name = func.__name__
+        operation = 'tests.workflows.core.test_executor.{name}'.format(name=name)
+        self.operation_details = {'operation': operation}
+        self.logger = logging.getLogger()
+        self.name = name
+        self.inputs = inputs or {}
+
+        for state in models.Task.STATES:
+            setattr(self, state.upper(), state)
+
+    @contextmanager
+    def update(self):
+        yield self
+
+
+def start_handler(task, *args, **kwargs):
+    task.states.append('start')
+
+
+def success_handler(task, *args, **kwargs):
+    task.states.append('success')
+
+
+def failure_handler(task, exception, *args, **kwargs):
+    task.states.append('failure')
+    task.exception = exception

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/core/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_task_graph_into_exececution_graph.py b/tests/workflows/core/test_task_graph_into_exececution_graph.py
new file mode 100644
index 0000000..75e825f
--- /dev/null
+++ b/tests/workflows/core/test_task_graph_into_exececution_graph.py
@@ -0,0 +1,97 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from networkx import topological_sort, DiGraph
+
+from aria import context
+from aria.workflows import api, core
+
+from ... import mock
+
+
+def test_task_graph_into_execution_graph():
+    task_context = mock.context.simple()
+    node = mock.models.get_dependency_node()
+    node_instance = mock.models.get_dependency_node_instance()
+    task_context.model.node.store(node)
+    task_context.model.node_instance.store(node_instance)
+
+    def sub_workflow(name, **_):
+        return api.task_graph.TaskGraph(name)
+
+    with context.workflow.current.push(task_context):
+        test_task_graph = api.task.WorkflowTask(sub_workflow, name='test_task_graph')
+        simple_before_task = api.task.OperationTask('test_simple_before_task', {}, node_instance)
+        simple_after_task = api.task.OperationTask('test_simple_after_task', {}, node_instance)
+
+        inner_task_graph = api.task.WorkflowTask(sub_workflow, name='test_inner_task_graph')
+        inner_task = api.task.OperationTask('test_inner_task', {}, node_instance)
+        inner_task_graph.add_tasks(inner_task)
+
+    test_task_graph.add_tasks(simple_before_task)
+    test_task_graph.add_tasks(simple_after_task)
+    test_task_graph.add_tasks(inner_task_graph)
+    test_task_graph.add_dependency(inner_task_graph, simple_before_task)
+    test_task_graph.add_dependency(simple_after_task, inner_task_graph)
+
+    # Direct check
+    execution_graph = DiGraph()
+    core.translation.build_execution_graph(task_graph=test_task_graph,
+                                           execution_graph=execution_graph)
+    execution_tasks = topological_sort(execution_graph)
+
+    assert len(execution_tasks) == 7
+
+    expected_tasks_names = [
+        '{0}-Start'.format(test_task_graph.id),
+        simple_before_task.id,
+        '{0}-Start'.format(inner_task_graph.id),
+        inner_task.id,
+        '{0}-End'.format(inner_task_graph.id),
+        simple_after_task.id,
+        '{0}-End'.format(test_task_graph.id)
+    ]
+
+    assert expected_tasks_names == execution_tasks
+
+    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
+                      core.task.StartWorkflowTask)
+
+    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[1], execution_graph),
+                                  simple_before_task)
+    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
+                      core.task.StartSubWorkflowTask)
+
+    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[3], execution_graph),
+                                  inner_task)
+    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph),
+                      core.task.EndSubWorkflowTask)
+
+    _assert_execution_is_api_task(_get_task_by_name(execution_tasks[5], execution_graph),
+                                  simple_after_task)
+    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph),
+                      core.task.EndWorkflowTask)
+
+
+def _assert_execution_is_api_task(execution_task, api_task):
+    assert execution_task.id == api_task.id
+    assert execution_task.name == api_task.name
+    assert execution_task.operation_details == api_task.operation_details
+    assert execution_task.node_instance == api_task.node_instance
+    assert execution_task.inputs == api_task.inputs
+
+
+def _get_task_by_name(task_name, graph):
+    return graph.node[task_name]['task']

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_engine.py b/tests/workflows/test_engine.py
index 03a9d19..ea703f5 100644
--- a/tests/workflows/test_engine.py
+++ b/tests/workflows/test_engine.py
@@ -20,11 +20,14 @@ import pytest
 import aria
 from aria import events
 from aria import workflow
-from aria import contexts
+from aria import context
 from aria.storage import models
 from aria.workflows import exceptions
 from aria.workflows.executor import thread
 from aria.workflows.core import engine
+from aria.workflows import api
+
+from .. import mock
 
 import tests.storage
 
@@ -47,8 +50,8 @@ class TestEngine(object):
 
     def test_single_task_successful_execution(self, workflow_context, executor):
         @workflow
-        def mock_workflow(context, graph):
-            graph.add_task(self._op(mock_success_task, context))
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(mock_success_task, ctx))
         self._execute(
             workflow_func=mock_workflow,
             workflow_context=workflow_context,
@@ -59,8 +62,8 @@ class TestEngine(object):
 
     def test_single_task_failed_execution(self, workflow_context, executor):
         @workflow
-        def mock_workflow(context, graph):
-            graph.add_task(self._op(mock_failed_task, context))
+        def mock_workflow(ctx, graph):
+            graph.add_tasks(self._op(mock_failed_task, ctx))
         with pytest.raises(exceptions.ExecutorException):
             self._execute(
                 workflow_func=mock_workflow,
@@ -72,10 +75,10 @@ class TestEngine(object):
 
     def test_two_tasks_execution_order(self, workflow_context, executor):
         @workflow
-        def mock_workflow(context, graph):
-            op1 = self._op(mock_ordered_task, context, inputs={'counter': 1})
-            op2 = self._op(mock_ordered_task, context, inputs={'counter': 2})
-            graph.chain(tasks=[op1, op2])
+        def mock_workflow(ctx, graph):
+            op1 = self._op(mock_ordered_task, ctx, inputs={'counter': 1})
+            op2 = self._op(mock_ordered_task, ctx, inputs={'counter': 2})
+            graph.sequence(op1, op2)
         self._execute(
             workflow_func=mock_workflow,
             workflow_context=workflow_context,
@@ -87,19 +90,17 @@ class TestEngine(object):
 
     @staticmethod
     def _execute(workflow_func, workflow_context, executor):
-        graph = workflow_func(context=workflow_context)
-        eng = engine.Engine(executor=executor,
-                            workflow_context=workflow_context,
-                            tasks_graph=graph)
+        graph = workflow_func(ctx=workflow_context)
+        eng = engine.Engine(executor=executor, workflow_context=workflow_context, tasks_graph=graph)
         eng.execute()
 
     @staticmethod
-    def _op(function, context, inputs=None):
-        return context.operation(
+    def _op(func, ctx, inputs=None):
+        return api.task.OperationTask(
             name='task',
-            node_instance=None,
             operation_details={'operation': 'tests.workflows.test_engine.{name}'.format(
-                name=function.__name__)},
+                name=func.__name__)},
+            node_instance=ctx.model.node_instance.get('dependency_node_instance'),
             inputs=inputs
         )
 
@@ -158,7 +159,11 @@ class TestEngine(object):
             updated_at=datetime.now(),
             workflows={})
         model_storage.deployment.store(deployment)
-        result = contexts.WorkflowContext(
+        node = mock.models.get_dependency_node()
+        node_instance = mock.models.get_dependency_node_instance(node)
+        model_storage.node.store(node)
+        model_storage.node_instance.store(node_instance)
+        result = context.workflow.WorkflowContext(
             name='test',
             model_storage=model_storage,
             resource_storage=None,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_executor.py b/tests/workflows/test_executor.py
deleted file mode 100644
index 7457fd0..0000000
--- a/tests/workflows/test_executor.py
+++ /dev/null
@@ -1,136 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import uuid
-
-import pytest
-import retrying
-
-from aria import events
-from aria.storage import models
-from aria.workflows.executor import (
-    thread,
-    multiprocess,
-    blocking,
-    # celery
-)
-
-try:
-    import celery as _celery
-    app = _celery.Celery()
-    app.conf.update(CELERY_RESULT_BACKEND='amqp://')
-except ImportError:
-    _celery = None
-    app = None
-
-
-class TestExecutor(object):
-
-    @pytest.mark.parametrize('executor_cls,executor_kwargs', [
-        (thread.ThreadExecutor, {'pool_size': 1}),
-        (thread.ThreadExecutor, {'pool_size': 2}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
-        (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
-        (blocking.CurrentThreadBlockingExecutor, {}),
-        # (celery.CeleryExecutor, {'app': app})
-    ])
-    def test_execute(self, executor_cls, executor_kwargs):
-        self.executor = executor_cls(**executor_kwargs)
-        expected_value = 'value'
-        successful_task = MockTask(mock_successful_task)
-        failing_task = MockTask(mock_failing_task)
-        task_with_inputs = MockTask(mock_task_with_input, inputs={'input': expected_value})
-
-        for task in [successful_task, failing_task, task_with_inputs]:
-            self.executor.execute(task)
-
-        @retrying.retry(stop_max_delay=10000, wait_fixed=100)
-        def assertion():
-            assert successful_task.states == ['start', 'success']
-            assert failing_task.states == ['start', 'failure']
-            assert task_with_inputs.states == ['start', 'failure']
-            assert isinstance(failing_task.exception, MockException)
-            assert isinstance(task_with_inputs.exception, MockException)
-            assert task_with_inputs.exception.message == expected_value
-        assertion()
-
-    def setup_method(self):
-        events.start_task_signal.connect(start_handler)
-        events.on_success_task_signal.connect(success_handler)
-        events.on_failure_task_signal.connect(failure_handler)
-
-    def teardown_method(self):
-        events.start_task_signal.disconnect(start_handler)
-        events.on_success_task_signal.disconnect(success_handler)
-        events.on_failure_task_signal.disconnect(failure_handler)
-        if self.executor:
-            self.executor.close()
-
-
-def mock_successful_task():
-    pass
-
-
-def mock_failing_task():
-    raise MockException
-
-
-def mock_task_with_input(input):
-    raise MockException(input)
-
-if app:
-    mock_successful_task = app.task(mock_successful_task)
-    mock_failing_task = app.task(mock_failing_task)
-    mock_task_with_input = app.task(mock_task_with_input)
-
-
-class MockException(Exception):
-    pass
-
-
-class MockContext(object):
-
-    def __init__(self, operation_details, inputs):
-        self.operation_details = operation_details
-        self.inputs = inputs
-        self.operation = models.Operation(execution_id='')
-
-
-class MockTask(object):
-
-    def __init__(self, func, inputs=None):
-        self.states = []
-        self.exception = None
-        self.id = str(uuid.uuid4())
-        name = func.__name__
-        operation = 'tests.workflows.test_executor.{name}'.format(name=name)
-        self.context = MockContext(operation_details={'operation': operation},
-                                   inputs=inputs or {})
-        self.logger = logging.getLogger()
-        self.name = name
-
-
-def start_handler(task, *args, **kwargs):
-    task.states.append('start')
-
-
-def success_handler(task, *args, **kwargs):
-    task.states.append('success')
-
-
-def failure_handler(task, exception, *args, **kwargs):
-    task.states.append('failure')
-    task.exception = exception

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tests/workflows/test_task_graph_into_exececution_graph.py
----------------------------------------------------------------------
diff --git a/tests/workflows/test_task_graph_into_exececution_graph.py b/tests/workflows/test_task_graph_into_exececution_graph.py
deleted file mode 100644
index 1bae713..0000000
--- a/tests/workflows/test_task_graph_into_exececution_graph.py
+++ /dev/null
@@ -1,79 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import pytest
-from networkx import topological_sort, DiGraph
-
-from aria import contexts
-from aria.workflows.api import tasks_graph
-from aria.workflows.core import tasks, translation
-
-
-@pytest.fixture(autouse=True)
-def no_storage(monkeypatch):
-    monkeypatch.setattr(tasks.OperationTask, '_create_operation_in_storage',
-                        value=lambda *args, **kwargs: None)
-
-
-def test_task_graph_into_execution_graph():
-    task_graph = tasks_graph.TaskGraph('test_task_graph')
-    simple_before_task = contexts.OperationContext('test_simple_before_task', {}, {}, None)
-    simple_after_task = contexts.OperationContext('test_simple_after_task', {}, {}, None)
-
-    inner_task_graph = tasks_graph.TaskGraph('test_inner_task_graph')
-    inner_task = contexts.OperationContext('test_inner_task', {}, {}, None)
-    inner_task_graph.add_task(inner_task)
-
-    task_graph.add_task(simple_before_task)
-    task_graph.add_task(simple_after_task)
-    task_graph.add_task(inner_task_graph)
-    task_graph.dependency(inner_task_graph, [simple_before_task])
-    task_graph.dependency(simple_after_task, [inner_task_graph])
-
-    # Direct check
-    execution_graph = DiGraph()
-    translation.build_execution_graph(task_graph=task_graph,
-                                      workflow_context=None,
-                                      execution_graph=execution_graph)
-    execution_tasks = topological_sort(execution_graph)
-
-    assert len(execution_tasks) == 7
-
-    expected_tasks_names = [
-        '{0}-Start'.format(task_graph.id),
-        simple_before_task.id,
-        '{0}-Start'.format(inner_task_graph.id),
-        inner_task.id,
-        '{0}-End'.format(inner_task_graph.id),
-        simple_after_task.id,
-        '{0}-End'.format(task_graph.id)
-    ]
-
-    assert expected_tasks_names == execution_tasks
-
-    assert isinstance(_get_task_by_name(execution_tasks[0], execution_graph),
-                      tasks.StartWorkflowTask)
-    assert simple_before_task == _get_task_by_name(execution_tasks[1], execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[2], execution_graph),
-                      tasks.StartSubWorkflowTask)
-    assert inner_task == _get_task_by_name(execution_tasks[3], execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[4], execution_graph), tasks.
-                      EndSubWorkflowTask)
-    assert simple_after_task == _get_task_by_name(execution_tasks[5], execution_graph).context
-    assert isinstance(_get_task_by_name(execution_tasks[6], execution_graph), tasks.EndWorkflowTask)
-
-
-def _get_task_by_name(task_name, graph):
-    return graph.node[task_name]['task']

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/39db8fc9/tox.ini
----------------------------------------------------------------------
diff --git a/tox.ini b/tox.ini
index e882ea8..8432c85 100644
--- a/tox.ini
+++ b/tox.ini
@@ -30,7 +30,7 @@ commands=pytest tests --cov-report term-missing --cov aria
 commands=pytest tests --cov-report term-missing --cov aria
 
 [testenv:pylint_code]
-commands=pylint aria --rcfile=aria/.pylintrc
+commands=pylint --rcfile=aria/.pylintrc --disable=fixme --ignore=commands.py aria
 
 [testenv:pylint_tests]
-commands=pylint tests --rcfile=tests/.pylintrc
+commands=pylint --rcfile=tests/.pylintrc --disable=fixme tests



Mime
View raw message