ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject incubator-ariatosca git commit: ARIA-41 Provide (initial) means for serializing an operation context object [Forced Update!]
Date Wed, 21 Dec 2016 17:39:04 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-41-limited-context-serialization d86214caf -> d143772d1 (forced update)


ARIA-41 Provide (initial) means for serializing an operation context object


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/d143772d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/d143772d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/d143772d

Branch: refs/heads/ARIA-41-limited-context-serialization
Commit: d143772d1497a54e447c1739083ed030328a7a0b
Parents: 5cf84ee
Author: Dan Kilman <dank@gigaspaces.com>
Authored: Mon Dec 19 17:05:54 2016 +0200
Committer: Dan Kilman <dank@gigaspaces.com>
Committed: Wed Dec 21 19:38:59 2016 +0200

----------------------------------------------------------------------
 aria/__init__.py                                |  26 ++---
 aria/orchestrator/context/common.py             |  11 +-
 aria/orchestrator/context/operation.py          |  21 ++--
 aria/orchestrator/context/serialization.py      |  94 +++++++++++++++++
 aria/orchestrator/context/workflow.py           |  13 ++-
 aria/orchestrator/workflows/core/task.py        |  11 +-
 aria/orchestrator/workflows/events_logging.py   |   6 +-
 aria/orchestrator/workflows/executor/process.py |  22 ++--
 aria/storage/filesystem_rapi.py                 |   5 +-
 tests/__init__.py                               |   4 +
 tests/mock/context.py                           |  18 +++-
 tests/orchestrator/context/test_serialize.py    | 101 +++++++++++++++++++
 tests/orchestrator/workflows/core/test_task.py  |   1 +
 .../workflows/executor/test_executor.py         |  19 +++-
 .../workflows/executor/test_process_executor.py |  11 +-
 15 files changed, 303 insertions(+), 60 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index 0f7bec6..cc362c0 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -41,8 +41,6 @@ __all__ = (
     'operation',
 )
 
-_resource_storage = {}
-
 
 def install_aria_extensions():
     """
@@ -61,7 +59,7 @@ def install_aria_extensions():
 
 def application_model_storage(api, api_kwargs=None):
     """
-    Initiate model storage for the supplied storage driver
+    Initiate model storage
     """
     models = [
         storage.models.Plugin,
@@ -85,17 +83,15 @@ def application_model_storage(api, api_kwargs=None):
     return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {})
 
 
-def application_resource_storage(driver):
+def application_resource_storage(api, api_kwargs=None):
     """
-    Initiate resource storage for the supplied storage driver
+    Initiate resource storage
     """
-    if driver not in _resource_storage:
-        _resource_storage[driver] = storage.ResourceStorage(
-            driver,
-            resources=[
-                'blueprint',
-                'deployment',
-                'plugin',
-                'snapshot',
-            ])
-    return _resource_storage[driver]
+    return storage.ResourceStorage(
+        api,
+        api_kwargs=api_kwargs or {},
+        items=[
+            'blueprint',
+            'deployment',
+            'plugin',
+        ])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 14efd9d..fdbe152 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -29,13 +29,9 @@ class BaseContext(logger.LoggerMixin):
     def __init__(
             self,
             name,
+            deployment_id,
             model_storage,
             resource_storage,
-            deployment_id,
-            workflow_name,
-            task_max_attempts=1,
-            task_retry_interval=0,
-            task_ignore_failure=False,
             **kwargs):
         super(BaseContext, self).__init__(**kwargs)
         self._name = name
@@ -43,16 +39,11 @@ class BaseContext(logger.LoggerMixin):
         self._model = model_storage
         self._resource = resource_storage
         self._deployment_id = deployment_id
-        self._workflow_name = workflow_name
-        self._task_max_attempts = task_max_attempts
-        self._task_retry_interval = task_retry_interval
-        self._task_ignore_failure = task_ignore_failure
 
     def __repr__(self):
         return (
             '{name}(name={self.name}, '
             'deployment_id={self._deployment_id}, '
-            'workflow_name={self._workflow_name}, '
             .format(name=self.__class__.__name__, self=self))
 
     @property

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index a73bad1..19bb73a 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -26,17 +26,22 @@ class BaseOperationContext(BaseContext):
     Context object used during operation creation and execution
     """
 
-    def __init__(self, name, workflow_context, task, actor, **kwargs):
+    def __init__(self,
+                 name,
+                 model_storage,
+                 resource_storage,
+                 deployment_id,
+                 task_id,
+                 actor_id,
+                 **kwargs):
         super(BaseOperationContext, self).__init__(
             name=name,
-            model_storage=workflow_context.model,
-            resource_storage=workflow_context.resource,
-            deployment_id=workflow_context._deployment_id,
-            workflow_name=workflow_context._workflow_name,
+            model_storage=model_storage,
+            resource_storage=resource_storage,
+            deployment_id=deployment_id,
             **kwargs)
-        self._task_model = task
-        self._task_id = task.id
-        self._actor_id = actor.id
+        self._task_id = task_id
+        self._actor_id = actor_id
 
     def __repr__(self):
         details = 'operation_mapping={task.operation_mapping}; ' \

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/serialization.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/serialization.py b/aria/orchestrator/context/serialization.py
new file mode 100644
index 0000000..93cb38a
--- /dev/null
+++ b/aria/orchestrator/context/serialization.py
@@ -0,0 +1,94 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import sqlalchemy.orm
+import sqlalchemy.pool
+
+import aria
+
+
+def operation_context_to_dict(context):
+    context_cls = context.__class__
+    context_dict = {
+        'name': context.name,
+        'deployment_id': context._deployment_id,
+        'task_id': context._task_id,
+        'actor_id': context._actor_id,
+    }
+    if context.model:
+        model = context.model
+        context_dict['model_storage'] = {
+            'api_cls': model.api,
+            'api_kwargs': _serialize_sql_mapi_kwargs(model)
+        }
+    else:
+        context_dict['model_storage'] = None
+    if context.resource:
+        resource = context.resource
+        context_dict['resource_storage'] = {
+            'api_cls': resource.api,
+            'api_kwargs': _serialize_file_rapi_kwargs(resource)
+        }
+    else:
+        context_dict['resource_storage'] = None
+    return {
+        'context_cls': context_cls,
+        'context': context_dict
+    }
+
+
+def operation_context_from_dict(context_dict):
+    context_cls = context_dict['context_cls']
+    context = context_dict['context']
+
+    model_storage = context['model_storage']
+    if model_storage:
+        api_cls = model_storage['api_cls']
+        api_kwargs = _deserialize_sql_mapi_kwargs(model_storage.get('api_kwargs', {}))
+        context['model_storage'] = aria.application_model_storage(api=api_cls,
+                                                                  api_kwargs=api_kwargs)
+
+    resource_storage = context['resource_storage']
+    if resource_storage:
+        api_cls = resource_storage['api_cls']
+        api_kwargs = _deserialize_file_rapi_kwargs(resource_storage.get('api_kwargs', {}))
+        context['resource_storage'] = aria.application_resource_storage(api=api_cls,
+                                                                        api_kwargs=api_kwargs)
+
+    return context_cls(**context)
+
+
+def _serialize_sql_mapi_kwargs(model):
+    engine_url = str(model._api_kwargs['engine'].url)
+    assert ':memory:' not in engine_url
+    return {'engine_url': engine_url}
+
+
+def _deserialize_sql_mapi_kwargs(api_kwargs):
+    engine_url = api_kwargs.get('engine_url')
+    if not engine_url:
+        return {}
+    engine = sqlalchemy.create_engine(engine_url)
+    session_factory = sqlalchemy.orm.sessionmaker(bind=engine)
+    session = sqlalchemy.orm.scoped_session(session_factory=session_factory)
+    return {'session': session, 'engine': engine}
+
+
+def _serialize_file_rapi_kwargs(resource):
+    return {'directory': resource._api_kwargs['directory']}
+
+
+def _deserialize_file_rapi_kwargs(api_kwargs):
+    return api_kwargs

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index e2e8e25..e3be2d5 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -29,9 +29,20 @@ class WorkflowContext(BaseContext):
     """
     Context object used during workflow creation and execution
     """
-    def __init__(self, parameters=None, execution_id=None, *args, **kwargs):
+    def __init__(self,
+                 workflow_name,
+                 parameters=None,
+                 execution_id=None,
+                 task_max_attempts=1,
+                 task_retry_interval=0,
+                 task_ignore_failure=False,
+                 *args, **kwargs):
         super(WorkflowContext, self).__init__(*args, **kwargs)
+        self._workflow_name = workflow_name
         self.parameters = parameters or {}
+        self._task_max_attempts = task_max_attempts
+        self._task_retry_interval = task_retry_interval
+        self._task_ignore_failure = task_ignore_failure
         # TODO: execution creation should happen somewhere else
         # should be moved there, when such logical place exists
         self._execution_id = self._create_execution() if execution_id is None else execution_id

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index 08cf26e..663eeac 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -134,14 +134,17 @@ class OperationTask(BaseTask):
             max_attempts=api_task.max_attempts,
             retry_interval=api_task.retry_interval,
             ignore_failure=api_task.ignore_failure,
-            plugin_id=plugin_id
+            plugin_id=plugin_id,
+            execution_id=self._workflow_context.execution.id
         )
         self._workflow_context.model.task.put(operation_task)
 
         self._ctx = context_class(name=api_task.name,
-                                  workflow_context=self._workflow_context,
-                                  task=operation_task,
-                                  actor=operation_task.actor)
+                                  model_storage=self._workflow_context.model,
+                                  resource_storage=self._workflow_context.resource,
+                                  deployment_id=self._workflow_context._deployment_id,
+                                  task_id=operation_task.id,
+                                  actor_id=api_task.actor.id)
         self._task_id = operation_task.id
         self._update_fields = None
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/workflows/events_logging.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/events_logging.py b/aria/orchestrator/workflows/events_logging.py
index 409ce0a..142ef74 100644
--- a/aria/orchestrator/workflows/events_logging.py
+++ b/aria/orchestrator/workflows/events_logging.py
@@ -35,9 +35,9 @@ def _success_task_handler(task, **kwargs):
 
 
 @events.on_failure_task_signal.connect
-def _failure_operation_handler(task, **kwargs):
-    task.logger.error('Event: Task failure: {task.name}'.format(task=task),
-                      exc_info=kwargs.get('exception', True))
+def _failure_operation_handler(task, exception, **kwargs):
+    error = '{0}: {1}'.format(type(exception).__name__, exception)
+    task.logger.error('Event: Task failure: {task.name} [{error}]'.format(task=task, error=error))
 
 
 @events.start_workflow_signal.connect

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 1a47d4c..5da03dd 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -41,6 +41,7 @@ import jsonpickle
 
 from aria.utils import imports
 from aria.orchestrator.workflows.executor import base
+from aria.orchestrator.context import serialization
 
 _IS_WIN = os.name == 'nt'
 
@@ -106,12 +107,7 @@ class ProcessExecutor(base.BaseExecutor):
         file_descriptor, arguments_json_path = tempfile.mkstemp(prefix='executor-', suffix='.json')
         os.close(file_descriptor)
         with open(arguments_json_path, 'wb') as f:
-            f.write(jsonpickle.dumps({
-                'task_id': task.id,
-                'operation_mapping': task.operation_mapping,
-                'operation_inputs': task.inputs,
-                'port': self._server_port
-            }))
+            f.write(jsonpickle.dumps(self._create_arguments_dict(task)))
 
         env = os.environ.copy()
         # See _update_env for plugin_prefix usage
@@ -176,6 +172,15 @@ class ProcessExecutor(base.BaseExecutor):
         if self._stopped:
             raise RuntimeError('Executor closed')
 
+    def _create_arguments_dict(self, task):
+        return {
+            'task_id': task.id,
+            'operation_mapping': task.operation_mapping,
+            'operation_inputs': task.inputs,
+            'port': self._server_port,
+            'context': serialization.operation_context_to_dict(task.context),
+        }
+
     def _update_env(self, env, plugin_prefix):
         pythonpath_dirs = []
         # If this is a plugin operation, plugin prefix will point to where
@@ -261,12 +266,13 @@ def _main():
     task_id = arguments['task_id']
     port = arguments['port']
     messenger = _Messenger(task_id=task_id, port=port)
+    messenger.started()
 
     operation_mapping = arguments['operation_mapping']
     operation_inputs = arguments['operation_inputs']
-    ctx = None
-    messenger.started()
+    context_dict = arguments['context']
     try:
+        ctx = serialization.operation_context_from_dict(context_dict)
         task_func = imports.load_attribute(operation_mapping)
         task_func(ctx=ctx, **operation_inputs)
         messenger.succeeded()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/aria/storage/filesystem_rapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/filesystem_rapi.py b/aria/storage/filesystem_rapi.py
index f810f58..c6b3a81 100644
--- a/aria/storage/filesystem_rapi.py
+++ b/aria/storage/filesystem_rapi.py
@@ -87,7 +87,10 @@ class FileSystemResourceAPI(api.ResourceAPI):
             os.makedirs(self.directory)
         except (OSError, IOError):
             pass
-        os.makedirs(self.base_path)
+        try:
+            os.makedirs(self.base_path)
+        except (OSError, IOError):
+            pass
 
     def read(self, entry_id, path=None, **_):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/__init__.py
----------------------------------------------------------------------
diff --git a/tests/__init__.py b/tests/__init__.py
index ae1e83e..d2858d2 100644
--- a/tests/__init__.py
+++ b/tests/__init__.py
@@ -12,3 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+
+import os
+
+ROOT_DIR = os.path.dirname(os.path.dirname(__file__))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/mock/context.py
----------------------------------------------------------------------
diff --git a/tests/mock/context.py b/tests/mock/context.py
index 1904140..5559675 100644
--- a/tests/mock/context.py
+++ b/tests/mock/context.py
@@ -13,15 +13,16 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from aria import application_model_storage
+import aria
 from aria.orchestrator import context
+from aria.storage.filesystem_rapi import FileSystemResourceAPI
 from aria.storage.sql_mapi import SQLAlchemyModelAPI
 
 from . import models
 
 
-def simple(api_kwargs, **kwargs):
-    model_storage = application_model_storage(SQLAlchemyModelAPI, api_kwargs=api_kwargs)
+def simple(mapi_kwargs, resources_dir=None, **kwargs):
+    model_storage = aria.application_model_storage(SQLAlchemyModelAPI, api_kwargs=mapi_kwargs)
     blueprint = models.get_blueprint()
     model_storage.blueprint.put(blueprint)
     deployment = models.get_deployment(blueprint)
@@ -56,10 +57,19 @@ def simple(api_kwargs, **kwargs):
     )
     model_storage.relationship_instance.put(relationship_instance)
 
+    # pytest tmpdir
+    if resources_dir:
+        resource_storage = aria.application_resource_storage(
+            FileSystemResourceAPI,
+            api_kwargs={'directory': resources_dir}
+        )
+    else:
+        resource_storage = None
+
     final_kwargs = dict(
         name='simple_context',
         model_storage=model_storage,
-        resource_storage=None,
+        resource_storage=resource_storage,
         deployment_id=deployment.id,
         workflow_name=models.WORKFLOW_NAME,
         task_max_attempts=models.TASK_MAX_ATTEMPTS,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
new file mode 100644
index 0000000..ed0afcd
--- /dev/null
+++ b/tests/orchestrator/context/test_serialize.py
@@ -0,0 +1,101 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+import aria
+from aria.storage.sql_mapi import SQLAlchemyModelAPI
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+from aria.orchestrator.context import serialization
+
+import tests
+from tests import mock
+from tests import storage
+
+TEST_FILE_CONTENT = 'CONTENT'
+TEST_FILE_ENTRY_ID = 'entry'
+TEST_FILE_NAME = 'test_file'
+
+
+def test_serialize_operation_context(context, executor, tmpdir):
+    test_file = tmpdir.join(TEST_FILE_NAME)
+    test_file.write(TEST_FILE_CONTENT)
+    resource = context.resource
+    resource.blueprint.upload(TEST_FILE_ENTRY_ID, str(test_file))
+    graph = _mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
+    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+    eng.execute()
+
+
+def test_illegal_serialize_of_memory_model_storage(memory_model_storage):
+    with pytest.raises(AssertionError):
+        serialization._serialize_sql_mapi_kwargs(memory_model_storage)
+
+
+@workflow
+def _mock_workflow(ctx, graph):
+    node_instance = ctx.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+    node_instance.node.operations['test.op'] = {'operation': _operation_mapping()}
+    task = api.task.OperationTask.node_instance(instance=node_instance, name='test.op')
+    graph.add_tasks(task)
+    return graph
+
+
+@operation
+def _mock_operation(ctx):
+    # We test several things in this operation
+    # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created
+    # a correct ctx.task.operation_mapping tells us we kept the correct task_id
+    assert ctx.task.operation_mapping == _operation_mapping()
+    # a correct ctx.node.name tells us we kept the correct actor_id
+    assert ctx.node.name == mock.models.DEPENDENCY_NODE_NAME
+    # a correct ctx.name tells us we kept the correct name
+    assert ctx.name is not None
+    assert ctx.name == ctx.task.name
+    # a correct ctx.deployment.name tells us we kept the correct deployment_id
+    assert ctx.deployment.name == mock.models.DEPLOYMENT_NAME
+    # Here we test that the resource storage was properly re-created
+    test_file_content = ctx.resource.blueprint.read(TEST_FILE_ENTRY_ID, TEST_FILE_NAME)
+    assert test_file_content == TEST_FILE_CONTENT
+
+
+def _operation_mapping():
+    return '{name}.{func.__name__}'.format(name=__name__, func=_mock_operation)
+
+
+@pytest.fixture
+def executor():
+    result = process.ProcessExecutor(python_path=[tests.ROOT_DIR])
+    yield result
+    result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+    result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir)),
+                                 resources_dir=str(tmpdir.join('resources')))
+    yield result
+    storage.release_sqlite_storage(result.model)
+
+
+@pytest.fixture
+def memory_model_storage():
+    result = aria.application_model_storage(
+        SQLAlchemyModelAPI, api_kwargs=storage.get_sqlite_api_kwargs())
+    yield result
+    storage.release_sqlite_storage(result)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index 6c3825c..fc11548 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -60,6 +60,7 @@ class TestOperationTask(object):
         node.operations['aria.interfaces.lifecycle.create'] = {'plugin': 'plugin1'}
         api_task, core_task = self._create_operation_task(ctx, node_instance)
         storage_task = ctx.model.task.get_by_name(core_task.name)
+        assert storage_task.execution_id == ctx.execution.id
         assert core_task.model_task == storage_task
         assert core_task.name == api_task.name
         assert core_task.operation_mapping == api_task.operation_mapping

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 7a11524..5ded4fb 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -14,7 +14,6 @@
 # limitations under the License.
 
 import logging
-import os
 import uuid
 from contextlib import contextmanager
 
@@ -29,7 +28,6 @@ except ImportError:
     _celery = None
     app = None
 
-import aria
 from aria.storage import models
 from aria.orchestrator import events
 from aria.orchestrator.workflows.executor import (
@@ -38,6 +36,8 @@ from aria.orchestrator.workflows.executor import (
     # celery
 )
 
+import tests
+
 
 def test_execute(executor):
     expected_value = 'value'
@@ -80,11 +80,20 @@ class MockException(Exception):
     pass
 
 
+class MockContext(object):
+
+    def __init__(self, *args, **kwargs):
+        pass
+
+    def __getattr__(self, item):
+        return None
+
+
 class MockTask(object):
 
     INFINITE_RETRIES = models.Task.INFINITE_RETRIES
 
-    def __init__(self, func, inputs=None, ctx=None):
+    def __init__(self, func, inputs=None):
         self.states = []
         self.exception = None
         self.id = str(uuid.uuid4())
@@ -94,7 +103,7 @@ class MockTask(object):
         self.logger = logging.getLogger()
         self.name = name
         self.inputs = inputs or {}
-        self.context = ctx
+        self.context = MockContext()
         self.retry_count = 0
         self.max_attempts = 1
         self.plugin_id = None
@@ -112,7 +121,7 @@ class MockTask(object):
     (thread.ThreadExecutor, {'pool_size': 2}),
     # subprocess needs to load a tests module so we explicitly add the root directory as
if
     # the project has been installed in editable mode
-    (process.ProcessExecutor, {'python_path': [os.path.dirname(os.path.dirname(aria.__file__))]}),
+    (process.ProcessExecutor, {'python_path': [tests.ROOT_DIR]}),
     # (celery.CeleryExecutor, {'app': app})
 ])
 def executor(request):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/d143772d/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index 364d354..0098f30 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -106,6 +106,15 @@ def mock_plugin(plugin_manager, tmpdir):
     return plugin_manager.install(source=plugin_path)
 
 
+class MockContext(object):
+
+    def __init__(self, *args, **kwargs):
+        pass
+
+    def __getattr__(self, item):
+        return None
+
+
 class MockTask(object):
 
     INFINITE_RETRIES = models.Task.INFINITE_RETRIES
@@ -116,7 +125,7 @@ class MockTask(object):
         self.logger = logging.getLogger()
         self.name = operation
         self.inputs = {}
-        self.context = None
+        self.context = MockContext()
         self.retry_count = 0
         self.max_attempts = 1
         self.plugin_id = plugin.id


Mime
View raw message