ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: ARIA-106-Create-sqla-logging-handler [Forced Update!]
Date Tue, 21 Feb 2017 14:37:18 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-106-Create-sqla-logging-handler 4995bc307 -> 91496db5e (forced update)


ARIA-106-Create-sqla-logging-handler


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/91496db5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/91496db5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/91496db5

Branch: refs/heads/ARIA-106-Create-sqla-logging-handler
Commit: 91496db5ed294c1e62be85cadf327a2a000c95e5
Parents: d347212
Author: mxmrlv <mxmrlv@gmail.com>
Authored: Mon Feb 13 12:28:27 2017 +0200
Committer: mxmrlv <mxmrlv@gmail.com>
Committed: Tue Feb 21 16:36:59 2017 +0200

----------------------------------------------------------------------
 aria/__init__.py                                |   3 +-
 aria/logger.py                                  |  49 +++++++-
 aria/orchestrator/context/common.py             |  42 ++++++-
 aria/orchestrator/context/operation.py          |  17 ++-
 aria/orchestrator/context/workflow.py           |   4 +
 aria/orchestrator/decorators.py                 |  12 +-
 aria/orchestrator/runner.py                     |   4 -
 aria/orchestrator/workflows/core/task.py        |  11 +-
 aria/orchestrator/workflows/executor/process.py |  26 +++-
 aria/orchestrator/workflows/executor/thread.py  |   2 +-
 aria/storage/__init__.py                        |  10 --
 aria/storage/core.py                            |  16 ++-
 aria/storage/modeling/model.py                  |   4 +
 aria/storage/modeling/orchestrator_elements.py  |  15 +++
 aria/storage/sql_mapi.py                        |   1 +
 aria/storage_initializer.py                     |   2 +-
 tests/mock/topology.py                          |   2 +-
 tests/orchestrator/context/test_operation.py    | 119 ++++++++++++++++---
 tests/orchestrator/context/test_serialize.py    |   2 +-
 .../execution_plugin/test_common.py             |   2 +-
 .../orchestrator/execution_plugin/test_local.py |   3 -
 tests/orchestrator/execution_plugin/test_ssh.py |   8 +-
 .../workflows/executor/test_executor.py         |  10 +-
 .../workflows/executor/test_process_executor.py |   6 +-
 tests/storage/__init__.py                       |  23 ++--
 tests/storage/test_instrumentation.py           |   6 +-
 tests/storage/test_structures.py                |   2 +-
 tests/test_logger.py                            |   3 +-
 28 files changed, 315 insertions(+), 89 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index 18eaa56..43529f0 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -97,7 +97,8 @@ def application_model_storage(api, api_kwargs=None, initiator=None, initiator_kw
         storage.modeling.model.ServiceInstanceUpdateStep,
         storage.modeling.model.ServiceInstanceModification,
         storage.modeling.model.Plugin,
-        storage.modeling.model.Task
+        storage.modeling.model.Task,
+        storage.modeling.model.Log
     ]
     return storage.ModelStorage(api_cls=api,
                                 api_kwargs=api_kwargs,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 0002cb5..ae6d4a6 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -17,8 +17,10 @@
 Logging related mixins and functions
 """
 
+
 import logging
-from logging.handlers import RotatingFileHandler
+from logging import handlers as logging_handlers
+from datetime import datetime
 
 _base_logger = logging.getLogger('aria')
 
@@ -97,6 +99,10 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None):
     return console
 
 
+def create_sqla_log_handler(session, engine, level=logging.DEBUG):
+    return _SQLAlchemyHandler(session=session, engine=engine, level=level)
+
+
 class _DefaultConsoleFormat(logging.Formatter):
     """
     _DefaultConsoleFormat class
@@ -106,10 +112,11 @@ class _DefaultConsoleFormat(logging.Formatter):
     """
     def format(self, record):
         try:
-            if record.levelno == logging.INFO:
-                self._fmt = '%(message)s'
+            if hasattr(record, 'prefix'):
+                self._fmt = '%(asctime)s: [%(levelname)s] @%(prefix)s ->%(message)s'
             else:
-                self._fmt = '%(levelname)s: %(message)s'
+                self._fmt = '%(asctime)s: [%(levelname)s] %(message)s'
+
         except AttributeError:
             return record.message
         return logging.Formatter.format(self, record)
@@ -124,7 +131,7 @@ def create_file_log_handler(
     """
     Create a logging.handlers.RotatingFileHandler
     """
-    rotating_file = RotatingFileHandler(
+    rotating_file = logging_handlers.RotatingFileHandler(
         filename=file_path,
         maxBytes=max_bytes,
         backupCount=backup_count,
@@ -135,5 +142,37 @@ def create_file_log_handler(
     return rotating_file
 
 
+class _SQLAlchemyHandler(logging.Handler):
+
+    def __init__(self, session, engine, **kwargs):
+        logging.Handler.__init__(self, **kwargs)
+        self._session = session
+        self._engine = engine
+
+        # Cyclic dependency
+        from aria.storage.modeling.model import Log
+        self._cls = Log
+
+    def emit(self, record):
+        # pylint fails to recognize that this class does indeed have __table__
+        self._cls.__table__.create(bind=self._engine, checkfirst=True)                  
           # pylint: disable=no-member
+        created_at = datetime.strptime(logging.Formatter('%(asctime)s').formatTime(record),
+                                       '%Y-%m-%d %H:%M:%S,%f')
+        log = self._cls(
+            prefix=record.prefix,
+            logger=record.name,
+            level=record.levelname,
+            msg=record.msg,
+            created_at=created_at,
+        )
+        self._session.add(log)
+
+        try:
+            self._session.commit()
+        except BaseException:
+            self._session.rollback()
+            raise
+
+
 _default_file_formatter = logging.Formatter(
     '%(asctime)s [%(name)s:%(levelname)s] %(message)s <%(pathname)s:%(lineno)d>')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 37482cf..902598e 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -15,19 +15,33 @@
 """
 A common context for both workflow and operation
 """
+from contextlib import contextmanager
+from functools import partial
 from uuid import uuid4
 
+import logging
 import jinja2
 
-from aria import logger
+from aria import logger as aria_logger
 from aria.storage import exceptions
 
 
-class BaseContext(logger.LoggerMixin):
+class BaseContext(aria_logger.LoggerMixin):
     """
     Base context object for workflow and operation
     """
 
+    class PrefixedLogger(object):
+        def __init__(self, logger, prefix=''):
+            self._logger = logger
+            self._prefix = prefix
+
+        def __getattr__(self, item):
+            if item.upper() in logging._levelNames:
+                return partial(getattr(self._logger, item), extra={'prefix': self._prefix})
+            else:
+                return getattr(self._logger, item)
+
     def __init__(
             self,
             name,
@@ -42,14 +56,38 @@ class BaseContext(logger.LoggerMixin):
         self._model = model_storage
         self._resource = resource_storage
         self._service_instance_id = service_instance_id
+        # self._logger = self._init_logger(ctx_logger)
         self._workdir = workdir
 
+    def _get_sqla_handler(self):
+        api_kwargs = self._model._initiator(**self._model._initiator_kwargs)
+        api_kwargs.update(**self._model._api_kwargs)
+        return aria_logger.create_sqla_log_handler(**api_kwargs)
+
     def __repr__(self):
         return (
             '{name}(name={self.name}, '
             'deployment_id={self._service_instance_id}, '
             .format(name=self.__class__.__name__, self=self))
 
+    @contextmanager
+    def self_logging(self, handlers=None):
+        handlers = set(handlers) if handlers else set()
+        try:
+            handlers.add(aria_logger.create_console_log_handler())
+            handlers.add(self._get_sqla_handler())
+            for handler in handlers:
+                self.logger.addHandler(handler)
+            self.logger = self.PrefixedLogger(self.logger, self.logging_id)
+            yield self.logger
+        finally:
+            for handler in handlers:
+                self.logger.removeHandler(handler)
+
+    @property
+    def logging_id(self):
+        raise NotImplementedError
+
     @property
     def model(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index c5ac8f0..97a09aa 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -46,12 +46,16 @@ class BaseOperationContext(BaseContext):
         self._task = None
 
     def __repr__(self):
-        details = 'operation_mapping={task.operation_mapping}; ' \
+        details = 'implementation={task.implementation}; ' \
                   'operation_inputs={task.inputs}'\
             .format(task=self.task)
         return '{name}({0})'.format(details, name=self.name)
 
     @property
+    def logging_id(self):
+        raise NotImplementedError
+
+    @property
     def task(self):
         """
         The task in the model storage
@@ -105,6 +109,11 @@ class NodeOperationContext(BaseOperationContext):
     """
     Context for node based operations.
     """
+
+    @property
+    def logging_id(self):
+        return self.node.name or self.node.id
+
     @property
     def node_template(self):
         """
@@ -126,6 +135,12 @@ class RelationshipOperationContext(BaseOperationContext):
     """
     Context for relationship based operations.
     """
+
+    @property
+    def logging_id(self):
+        return '{0}:{1}'.format(self.source_node.name or self.source_node.id,
+                                self.target_node.name or self.target_node.id)
+
     @property
     def source_node_template(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 00ed974..63ece3a 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -65,6 +65,10 @@ class WorkflowContext(BaseContext):
         return execution.id
 
     @property
+    def logging_id(self):
+        return self._workflow_name
+
+    @property
     def execution(self):
         """
         The execution model

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/decorators.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py
index bbe43f4..f915813 100644
--- a/aria/orchestrator/decorators.py
+++ b/aria/orchestrator/decorators.py
@@ -54,12 +54,16 @@ def workflow(func=None, suffix_template=''):
     return _wrapper
 
 
-def operation(func=None, toolbelt=False, suffix_template=''):
+def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=None):
     """
     Operation decorator
     """
+
     if func is None:
-        return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt)
+        return partial(operation,
+                       suffix_template=suffix_template,
+                       toolbelt=toolbelt,
+                       logging_handlers=logging_handlers)
 
     @wraps(func)
     def _wrapper(**func_kwargs):
@@ -67,7 +71,9 @@ def operation(func=None, toolbelt=False, suffix_template=''):
             operation_toolbelt = context.toolbelt(func_kwargs['ctx'])
             func_kwargs.setdefault('toolbelt', operation_toolbelt)
         validate_function_arguments(func, func_kwargs)
-        return func(**func_kwargs)
+
+        with func_kwargs['ctx'].self_logging(handlers=logging_handlers):
+            return func(**func_kwargs)
     return _wrapper
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py
index 8927de9..bb92d1c 100644
--- a/aria/orchestrator/runner.py
+++ b/aria/orchestrator/runner.py
@@ -33,9 +33,6 @@ from .. import (
 )
 
 
-SQLITE_IN_MEMORY = 'sqlite:///:memory:'
-
-
 class Runner(object):
     """
     Runs workflows on a deployment. By default uses temporary storage (either on disk or
in memory)
@@ -97,7 +94,6 @@ class Runner(object):
             task_max_attempts=1,
             task_retry_interval=1)
 
-
     def cleanup(self):
         if (self._is_storage_temporary and (self._storage_path is not None) and
                 os.path.isfile(self._storage_path)):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index d0e1363..e9a6d94 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -23,7 +23,6 @@ from functools import (
     wraps,
 )
 
-from aria import logger
 from aria.storage.modeling import model
 from aria.orchestrator.context import operation as operation_context
 
@@ -42,7 +41,7 @@ def _locked(func=None):
     return _wrapper
 
 
-class BaseTask(logger.LoggerMixin):
+class BaseTask(object):
     """
     Base class for Task objects
     """
@@ -151,6 +150,14 @@ class OperationTask(BaseTask):
         self._task_id = operation_task.id
         self._update_fields = None
 
+    @property
+    def self_logging(self):
+        return self._ctx.self_logging
+
+    @property
+    def logger(self):
+        return self._ctx.logger
+
     @contextmanager
     def _update(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 75bbbce..991fec6 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -200,7 +200,9 @@ class ProcessExecutor(base.BaseExecutor):
                     request_handler = self._request_handlers.get(request_type)
                     if not request_handler:
                         raise RuntimeError('Invalid request type: {0}'.format(request_type))
-                    request_handler(task_id=request['task_id'], request=request, response=response)
+                    task_id = request['task_id']
+                    # with self._tasks[task_id].self_logging():
+                    request_handler(task_id=task_id, request=request, response=response)
             except BaseException as e:
                 self.logger.debug('Error in process executor listener: {0}'.format(e))
 
@@ -251,23 +253,35 @@ class ProcessExecutor(base.BaseExecutor):
 
 
 def _send_message(connection, message):
+
+    # Packing the length of the entire msg using struct.pack.
+    # This enables later reading of the content.
+    def _pack(data):
+        return struct.pack(_INT_FMT, len(data))
+
     data = jsonpickle.dumps(message)
-    connection.send(struct.pack(_INT_FMT, len(data)))
+    msg_metadata = _pack(data)
+    connection.send(msg_metadata)
     connection.sendall(data)
 
 
 def _recv_message(connection):
-    message_len, = struct.unpack(_INT_FMT, _recv_bytes(connection, _INT_SIZE))
-    return jsonpickle.loads(_recv_bytes(connection, message_len))
+    # Retrieving the length of the msg to come.
+    def _unpack(conn):
+        return struct.unpack(_INT_FMT, _recv_bytes(conn, _INT_SIZE, wait=True))[0]
+
+    msg_metadata_len = _unpack(connection)
+    msg = _recv_bytes(connection, msg_metadata_len)
+    return jsonpickle.loads(msg)
 
 
-def _recv_bytes(connection, count):
+def _recv_bytes(connection, count, wait=False):
     result = io.BytesIO()
     while True:
         if not count:
             return result.getvalue()
         read = connection.recv(count)
-        if not read:
+        if not wait and not read:
             return result.getvalue()
         result.write(read)
         count -= len(read)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 7ae0217..6c59986 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -63,5 +63,5 @@ class ThreadExecutor(BaseExecutor):
                 except BaseException as e:
                     self._task_failed(task, exception=e)
             # Daemon threads
-            except BaseException:
+            except BaseException as e:
                 pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/__init__.py
----------------------------------------------------------------------
diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py
index eaadc7e..45af1be 100644
--- a/aria/storage/__init__.py
+++ b/aria/storage/__init__.py
@@ -42,12 +42,6 @@ from .core import (
     ModelStorage,
     ResourceStorage,
 )
-from .modeling import (
-    structure,
-    model,
-    model_base,
-    type
-)
 from . import (
     exceptions,
     api,
@@ -58,14 +52,10 @@ from . import (
 
 __all__ = (
     'exceptions',
-    'structure',
     'Storage',
     'ModelStorage',
     'ResourceStorage',
     'filesystem_rapi',
     'sql_mapi',
     'api',
-    'model',
-    'model_base',
-    'type',
 )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index 0e189e6..7d70070 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -84,6 +84,12 @@ class Storage(LoggerMixin):
         self.logger.debug('{name} object is ready: {0!r}'.format(
             self, name=self.__class__.__name__))
 
+    @property
+    def all_api_kwargs(self):
+        kwargs = self._api_kwargs.copy()
+        kwargs.update(self._additional_api_kwargs)
+        return kwargs
+
     def __repr__(self):
         return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self)
 
@@ -121,9 +127,7 @@ class ResourceStorage(Storage):
         :param name:
         :return:
         """
-        kwargs = self._api_kwargs.copy()
-        kwargs.update(self._additional_api_kwargs)
-        self.registered[name] = self.api(name=name, **kwargs)
+        self.registered[name] = self.api(name=name, **self.all_api_kwargs)
         self.registered[name].create()
         self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self))
 
@@ -148,9 +152,9 @@ class ModelStorage(Storage):
             self.logger.debug('{name} in already storage {self!r}'.format(name=model_name,
                                                                           self=self))
             return
-        kwargs = self._api_kwargs.copy()
-        kwargs.update(self._additional_api_kwargs)
-        self.registered[model_name] = self.api(name=model_name, model_cls=model_cls, **kwargs)
+        self.registered[model_name] = self.api(name=model_name,
+                                               model_cls=model_cls,
+                                               **self.all_api_kwargs)
         self.registered[model_name].create()
         self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self))
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/modeling/model.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/model.py b/aria/storage/modeling/model.py
index 62b90b3..cf7d933 100644
--- a/aria/storage/modeling/model.py
+++ b/aria/storage/modeling/model.py
@@ -216,4 +216,8 @@ class Plugin(aria_declarative_base, orchestrator_elements.PluginBase):
 
 class Task(aria_declarative_base, orchestrator_elements.TaskBase):
     pass
+
+
+class Log(aria_declarative_base, orchestrator_elements.LogBase):
+    pass
 # endregion

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/modeling/orchestrator_elements.py
----------------------------------------------------------------------
diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py
index 5f7a3f2..47fe49f 100644
--- a/aria/storage/modeling/orchestrator_elements.py
+++ b/aria/storage/modeling/orchestrator_elements.py
@@ -466,3 +466,18 @@ class TaskBase(ModelMixin):
     @staticmethod
     def retry(message=None, retry_interval=None):
         raise TaskRetryException(message, retry_interval=retry_interval)
+
+
+class LogBase(ModelMixin):
+    __tablename__ = 'log'
+
+    logger = Column(String)
+    level = Column(String)
+    msg = Column(String)
+    created_at = Column(DateTime, index=True)
+    prefix = Column(String)
+    description = Column(String)
+
+    def __repr__(self):
+        return "{self.created_at}: [{self.level}] @{self.prefix} ->{msg}".format(
+            self=self, msg=self.msg[:50])

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/sql_mapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py
index 2711aee..8bbec54 100644
--- a/aria/storage/sql_mapi.py
+++ b/aria/storage/sql_mapi.py
@@ -145,6 +145,7 @@ class SQLAlchemyModelAPI(api.ModelAPI):
 
     def create(self, checkfirst=True, create_all=True, **kwargs):
         self.model_cls.__table__.create(self._engine, checkfirst=checkfirst)
+
         if create_all:
             # In order to create any models created dynamically (e.g. many-to-many helper
tables are
             # created at runtime).

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage_initializer.py
----------------------------------------------------------------------
diff --git a/aria/storage_initializer.py b/aria/storage_initializer.py
index 175ec22..8c154df 100644
--- a/aria/storage_initializer.py
+++ b/aria/storage_initializer.py
@@ -16,7 +16,7 @@
 from datetime import datetime
 from threading import RLock
 
-from .storage import model
+from .storage.modeling import model
 from .orchestrator import operation
 from .utils.formatting import safe_repr
 from .utils.console import puts, Colored

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/mock/topology.py
----------------------------------------------------------------------
diff --git a/tests/mock/topology.py b/tests/mock/topology.py
index b04fb46..d3e8b7b 100644
--- a/tests/mock/topology.py
+++ b/tests/mock/topology.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from aria.storage import model
+from aria.storage.modeling import model
 
 from . import models
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index 3f39979..c2f5fd0 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -15,16 +15,21 @@
 
 import os
 
+import logging
+import tempfile
+
 import pytest
 
+from aria.orchestrator.workflows.executor import process, thread
+
 from aria import (
     workflow,
     operation,
 )
 from aria.orchestrator import context
 from aria.orchestrator.workflows import api
-from aria.orchestrator.workflows.executor import thread
 
+import tests
 from tests import mock, storage
 from . import (
     op_path,
@@ -38,16 +43,27 @@ global_test_holder = {}
 @pytest.fixture
 def ctx(tmpdir):
     context = mock.context.simple(
-        str(tmpdir.join('workdir')),
-        inmemory=True,
+        str(tmpdir),
         context_kwargs=dict(workdir=str(tmpdir.join('workdir')))
     )
     yield context
     storage.release_sqlite_storage(context.model)
 
 
+tmp_file = os.path.join(tempfile.gettempdir(), 'op_testing_const_file_name')
+
+
+@pytest.fixture(autouse=True)
+def tmpfile_cleanup():
+    try:
+        yield
+    finally:
+        if os.path.isfile(tmp_file):
+            os.remove(tmp_file)
+
+
 @pytest.fixture
-def executor():
+def thread_executor():
     result = thread.ThreadExecutor()
     try:
         yield result
@@ -55,13 +71,23 @@ def executor():
         result.close()
 
 
-def test_node_operation_task_execution(ctx, executor):
+@pytest.fixture(params=[(thread.ThreadExecutor()),
+                        (process.ProcessExecutor(python_path=tests.ROOT_DIR))])
+def executor(request):
+    ex = request.param
+    try:
+        yield ex
+    finally:
+        ex.close()
+
+
+def test_node_operation_task_execution(ctx, thread_executor):
     operation_name = 'aria.interfaces.lifecycle.create'
 
     node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
     interface = mock.models.get_interface(
         operation_name,
-        operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__))
+        operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__))
     )
     node.interfaces = [interface]
     ctx.model.node.update(node)
@@ -77,7 +103,7 @@ def test_node_operation_task_execution(ctx, executor):
             )
         )
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
 
     operation_context = global_test_holder[op_name(node, operation_name)]
 
@@ -96,13 +122,13 @@ def test_node_operation_task_execution(ctx, executor):
     assert operation_context.node == node
 
 
-def test_relationship_operation_task_execution(ctx, executor):
+def test_relationship_operation_task_execution(ctx, thread_executor):
     operation_name = 'aria.interfaces.relationship_lifecycle.post_configure'
     relationship = ctx.model.relationship.list()[0]
 
     interface = mock.models.get_interface(
         operation_name=operation_name,
-        operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__)),
+        operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)),
         edge='source'
     )
 
@@ -121,7 +147,7 @@ def test_relationship_operation_task_execution(ctx, executor):
             )
         )
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
 
     operation_context = global_test_holder[op_name(relationship,
                                                    operation_name)]
@@ -148,12 +174,12 @@ def test_relationship_operation_task_execution(ctx, executor):
     assert operation_context.source_node == dependent_node
 
 
-def test_invalid_task_operation_id(ctx, executor):
+def test_invalid_task_operation_id(ctx, thread_executor):
     """
     Checks that the right id is used. The task created with id == 1, thus running the task
on
     node_instance with id == 2. will check that indeed the node_instance uses the correct
id.
     :param ctx:
-    :param executor:
+    :param thread_executor:
     :return:
     """
     operation_name = 'aria.interfaces.lifecycle.create'
@@ -174,14 +200,14 @@ def test_invalid_task_operation_id(ctx, executor):
             api.task.OperationTask.node(name=operation_name, instance=node)
         )
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
 
     op_node_instance_id = global_test_holder[op_name(node, operation_name)]
     assert op_node_instance_id == node.id
     assert op_node_instance_id != other_node.id
 
 
-def test_plugin_workdir(ctx, executor, tmpdir):
+def test_plugin_workdir(ctx, thread_executor, tmpdir):
     op = 'test.op'
     plugin_name = 'mock_plugin'
     node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
@@ -203,15 +229,76 @@ def test_plugin_workdir(ctx, executor, tmpdir):
         graph.add_tasks(api.task.OperationTask.node(
             name=op, instance=node, inputs=inputs))
 
-    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor)
     expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service_instance.id),
                                 plugin_name,
                                 filename)
     assert expected_file.read() == content
 
 
+def test_operation_logging(ctx, executor):
+    operation_name = 'aria.interfaces.lifecycle.create'
+
+    node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+    interface = mock.models.get_interface(
+        operation_name,
+        operation_kwargs=dict(implementation=op_path(logged_operation, module_path=__name__))
+    )
+    node.interfaces = [interface]
+    ctx.model.node.update(node)
+
+    inputs = {
+        'op_start': 'op_start',
+        'op_end': 'op_end',
+    }
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.add_tasks(
+            api.task.OperationTask.node(
+                name=operation_name,
+                instance=node,
+                inputs=inputs
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+
+    logs = ctx.model.log.list()
+
+    assert len(logs) == 2
+
+    op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() ==
'info']
+    assert len(op_start_log) == 1
+    op_start_log = op_start_log[0]
+
+    op_end_log = [l for l in logs if inputs['op_end'] in l.msg and l.level.lower() == 'debug']
+    assert len(op_end_log) == 1
+    op_end_log = op_end_log[0]
+
+    assert op_start_log.created_at < op_end_log.created_at
+
+    with open(tmp_file, 'r') as f:
+        logs = [l.strip() for l in f.readlines()]
+
+    assert inputs['op_start'] in logs
+    assert inputs['op_end'] in logs
+
+
+class MockLogHandler(logging.Handler):
+    def emit(self, record):
+        with open(tmp_file, 'a+') as f:
+            f.write(record.msg + '\n')
+
+
+@operation(logging_handlers=[MockLogHandler()])
+def logged_operation(ctx, **_):
+    ctx.logger.info(ctx.task.inputs['op_start'])
+    ctx.logger.debug(ctx.task.inputs['op_end'])
+
+
 @operation
-def my_operation(ctx, **_):
+def basic_operation(ctx, **_):
     global_test_holder[ctx.name] = ctx
 
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/context/test_serialize.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py
index 1fdcb1a..03f9529 100644
--- a/tests/orchestrator/context/test_serialize.py
+++ b/tests/orchestrator/context/test_serialize.py
@@ -58,7 +58,7 @@ def _mock_workflow(ctx, graph):
 def _mock_operation(ctx):
     # We test several things in this operation
     # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created
-    # a correct ctx.task.operation_mapping tells us we kept the correct task_id
+    # a correct ctx.task.implementation tells us we kept the correct task_id
     assert ctx.task.implementation == _operation_mapping()
     # a correct ctx.node.name tells us we kept the correct actor_id
     assert ctx.node.name == mock.models.DEPENDENCY_NODE_INSTANCE_NAME

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/execution_plugin/test_common.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_common.py b/tests/orchestrator/execution_plugin/test_common.py
index 1048c27..151b996 100644
--- a/tests/orchestrator/execution_plugin/test_common.py
+++ b/tests/orchestrator/execution_plugin/test_common.py
@@ -18,7 +18,7 @@ from collections import namedtuple
 import requests
 import pytest
 
-from aria.storage import model
+from aria.storage.modeling import model
 from aria.orchestrator import exceptions
 from aria.orchestrator.execution_plugin import common
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 9e9540f..5224496 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -483,9 +483,6 @@ if __name__ == '__main__':
                     operations.__name__,
                     operations.run_script_locally.__name__))
             )]
-            # node.operations[op] = {
-            #     'operation': '{0}.{1}'.format(operations.__name__,
-            #                                   operations.run_script_locally.__name__)}
             graph.add_tasks(api.task.OperationTask.node(
                 instance=node,
                 name=op,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index 2e270bb..cf4c7e1 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -24,7 +24,7 @@ import fabric.api
 from fabric.contrib import files
 from fabric import context_managers
 
-from aria.storage import model
+from aria.storage.modeling import model
 from aria.orchestrator import events
 from aria.orchestrator import workflow
 from aria.orchestrator.workflows import api
@@ -413,6 +413,12 @@ class TestFabricEnvHideGroupsAndRunCommands(object):
         task.runs_on = Stub
         logger = logging.getLogger()
 
+    @staticmethod
+    @contextlib.contextmanager
+    def _mock_self_logging(*args, **kwargs):
+        yield
+    _Ctx.self_logging = _mock_self_logging
+
     @pytest.fixture(autouse=True)
     def _setup(self, mocker):
         self.default_fabric_env = {

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 580bf8b..64dfb66 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -83,7 +83,7 @@ class MockException(Exception):
 class MockContext(object):
 
     def __init__(self, *args, **kwargs):
-        pass
+        self.logger = logging.getLogger()
 
     def __getattr__(self, item):
         if item == 'serialization_dict':
@@ -95,6 +95,10 @@ class MockContext(object):
     def deserialize_from_dict(cls, **kwargs):
         return cls()
 
+    @contextmanager
+    def self_logging(self):
+        yield self.logger
+
 
 class MockTask(object):
 
@@ -124,6 +128,10 @@ class MockTask(object):
     def _update(self):
         yield self
 
+    @contextmanager
+    def self_logging(self):
+        yield self.logger
+
 
 @pytest.fixture(params=[
     (thread.ThreadExecutor, {'pool_size': 1}),

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/workflows/executor/test_process_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py
index e904eb3..1bdc1e4 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor.py
@@ -22,10 +22,8 @@ from contextlib import contextmanager
 import pytest
 
 from aria import application_model_storage
-from aria.storage import (
-    model as aria_model,
-    sql_mapi
-)
+from aria.storage import sql_mapi
+from aria.storage.modeling import model as aria_model
 from aria.orchestrator import (
     events,
     plugin

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/storage/__init__.py
----------------------------------------------------------------------
diff --git a/tests/storage/__init__.py b/tests/storage/__init__.py
index 4278831..5323d01 100644
--- a/tests/storage/__init__.py
+++ b/tests/storage/__init__.py
@@ -12,8 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import os
-import platform
+
 from shutil import rmtree
 from tempfile import mkdtemp
 
@@ -23,19 +22,19 @@ from sqlalchemy import (
     Column,
     Text,
     Integer,
-    pool
+    pool,
+    MetaData
 )
 
 
-from aria.storage import (
+from aria.storage.modeling import (
     model,
-    type as aria_type,
     structure,
-    modeling
+    type as aria_type
 )
 
 
-class MockModel(model.aria_declarative_base, structure.ModelMixin): #pylint: disable=abstract-method
+class MockModel(structure.ModelMixin, model.aria_declarative_base): #pylint: disable=abstract-method
     __tablename__ = 'mock_model'
     model_dict = Column(aria_type.Dict)
     model_list = Column(aria_type.List)
@@ -58,14 +57,8 @@ def release_sqlite_storage(storage):
     :param storage:
     :return:
     """
-    mapis = storage.registered.values()
-
-    if mapis:
-        for session in set(mapi._session for mapi in mapis):
-            session.rollback()
-            session.close()
-        for engine in set(mapi._engine for mapi in mapis):
-            model.aria_declarative_base.metadata.drop_all(engine)
+    storage.all_api_kwargs['session'].close()
+    MetaData(bind=storage.all_api_kwargs['engine']).drop_all()
 
 
 def init_inmemory_model_storage():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py
index 08d5ae0..7f0eb02 100644
--- a/tests/storage/test_instrumentation.py
+++ b/tests/storage/test_instrumentation.py
@@ -17,13 +17,15 @@ import pytest
 from sqlalchemy import Column, Text, Integer, event
 
 from aria.storage import (
-    structure,
     ModelStorage,
     sql_mapi,
     instrumentation,
     exceptions,
+)
+from aria.storage.modeling import (
+    model,
     type as aria_type,
-    model
+    structure,
 )
 from ..storage import release_sqlite_storage, init_inmemory_model_storage
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/storage/test_structures.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py
index 666256e..27e99d7 100644
--- a/tests/storage/test_structures.py
+++ b/tests/storage/test_structures.py
@@ -21,9 +21,9 @@ from aria.storage import (
     ModelStorage,
     sql_mapi,
     exceptions,
-    type,
     modeling,
 )
+from aria.storage.modeling import type
 
 from ..storage import release_sqlite_storage, structure, init_inmemory_model_storage
 from . import MockModel

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/test_logger.py
----------------------------------------------------------------------
diff --git a/tests/test_logger.py b/tests/test_logger.py
index 70f08bb..1ad055c 100644
--- a/tests/test_logger.py
+++ b/tests/test_logger.py
@@ -53,7 +53,8 @@ def test_create_console_log_handler(capsys):
     logger.info(info_test_string)
     logger.debug(debug_test_string)
     _, err = capsys.readouterr()
-    assert err.count('DEBUG: {test_string}'.format(test_string=debug_test_string)) == 1
+
+    assert err.count('[DEBUG] {test_string}'.format(test_string=debug_test_string))
     assert err.count(info_test_string) == 1
 
     # Custom handler



Mime
View raw message