Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 226FC200C22 for ; Tue, 21 Feb 2017 15:38:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 20DAD160B68; Tue, 21 Feb 2017 14:38:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 79FE1160B3E for ; Tue, 21 Feb 2017 15:38:24 +0100 (CET) Received: (qmail 57321 invoked by uid 500); 21 Feb 2017 14:38:23 -0000 Mailing-List: contact dev-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list dev@ariatosca.incubator.apache.org Received: (qmail 57310 invoked by uid 99); 21 Feb 2017 14:38:23 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Feb 2017 14:38:23 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 26A0AC0258 for ; Tue, 21 Feb 2017 14:38:23 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -5.22 X-Spam-Level: X-Spam-Status: No, score=-5.22 tagged_above=-999 required=6.31 tests=[HK_RANDOM_FROM=0.999, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id L5p3_7CajQgc for ; Tue, 21 Feb 2017 14:38:17 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 3AFF85F4A8 for ; Tue, 21 Feb 2017 14:38:15 +0000 (UTC) Received: (qmail 54856 invoked by uid 99); 21 Feb 2017 14:37:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 21 Feb 2017 14:37:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7FEB0DFC1C; Tue, 21 Feb 2017 14:37:18 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxmrlv@apache.org To: dev@ariatosca.incubator.apache.org Message-Id: <7dabb113d96244189f3d1b1b21ae71b6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ariatosca git commit: ARIA-106-Create-sqla-logging-handler [Forced Update!] Date: Tue, 21 Feb 2017 14:37:18 +0000 (UTC) archived-at: Tue, 21 Feb 2017 14:38:26 -0000 Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-106-Create-sqla-logging-handler 4995bc307 -> 91496db5e (forced update) ARIA-106-Create-sqla-logging-handler Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/91496db5 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/91496db5 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/91496db5 Branch: refs/heads/ARIA-106-Create-sqla-logging-handler Commit: 91496db5ed294c1e62be85cadf327a2a000c95e5 Parents: d347212 Author: mxmrlv Authored: Mon Feb 13 12:28:27 2017 +0200 Committer: mxmrlv Committed: Tue Feb 21 16:36:59 2017 +0200 ---------------------------------------------------------------------- aria/__init__.py | 3 +- aria/logger.py | 49 +++++++- aria/orchestrator/context/common.py | 42 ++++++- aria/orchestrator/context/operation.py | 17 ++- aria/orchestrator/context/workflow.py | 4 + aria/orchestrator/decorators.py | 12 +- aria/orchestrator/runner.py | 4 - aria/orchestrator/workflows/core/task.py | 11 +- aria/orchestrator/workflows/executor/process.py | 26 +++- aria/orchestrator/workflows/executor/thread.py | 2 +- aria/storage/__init__.py | 10 -- aria/storage/core.py | 16 ++- aria/storage/modeling/model.py | 4 + aria/storage/modeling/orchestrator_elements.py | 15 +++ aria/storage/sql_mapi.py | 1 + aria/storage_initializer.py | 2 +- tests/mock/topology.py | 2 +- tests/orchestrator/context/test_operation.py | 119 ++++++++++++++++--- tests/orchestrator/context/test_serialize.py | 2 +- .../execution_plugin/test_common.py | 2 +- .../orchestrator/execution_plugin/test_local.py | 3 - tests/orchestrator/execution_plugin/test_ssh.py | 8 +- .../workflows/executor/test_executor.py | 10 +- .../workflows/executor/test_process_executor.py | 6 +- tests/storage/__init__.py | 23 ++-- tests/storage/test_instrumentation.py | 6 +- tests/storage/test_structures.py | 2 +- tests/test_logger.py | 3 +- 28 files changed, 315 insertions(+), 89 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 18eaa56..43529f0 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -97,7 +97,8 @@ def application_model_storage(api, api_kwargs=None, initiator=None, initiator_kw storage.modeling.model.ServiceInstanceUpdateStep, storage.modeling.model.ServiceInstanceModification, storage.modeling.model.Plugin, - storage.modeling.model.Task + storage.modeling.model.Task, + storage.modeling.model.Log ] return storage.ModelStorage(api_cls=api, api_kwargs=api_kwargs, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/logger.py ---------------------------------------------------------------------- diff --git a/aria/logger.py b/aria/logger.py index 0002cb5..ae6d4a6 100644 --- a/aria/logger.py +++ b/aria/logger.py @@ -17,8 +17,10 @@ Logging related mixins and functions """ + import logging -from logging.handlers import RotatingFileHandler +from logging import handlers as logging_handlers +from datetime import datetime _base_logger = logging.getLogger('aria') @@ -97,6 +99,10 @@ def create_console_log_handler(level=logging.DEBUG, formatter=None): return console +def create_sqla_log_handler(session, engine, level=logging.DEBUG): + return _SQLAlchemyHandler(session=session, engine=engine, level=level) + + class _DefaultConsoleFormat(logging.Formatter): """ _DefaultConsoleFormat class @@ -106,10 +112,11 @@ class _DefaultConsoleFormat(logging.Formatter): """ def format(self, record): try: - if record.levelno == logging.INFO: - self._fmt = '%(message)s' + if hasattr(record, 'prefix'): + self._fmt = '%(asctime)s: [%(levelname)s] @%(prefix)s ->%(message)s' else: - self._fmt = '%(levelname)s: %(message)s' + self._fmt = '%(asctime)s: [%(levelname)s] %(message)s' + except AttributeError: return record.message return logging.Formatter.format(self, record) @@ -124,7 +131,7 @@ def create_file_log_handler( """ Create a logging.handlers.RotatingFileHandler """ - rotating_file = RotatingFileHandler( + rotating_file = logging_handlers.RotatingFileHandler( filename=file_path, maxBytes=max_bytes, backupCount=backup_count, @@ -135,5 +142,37 @@ def create_file_log_handler( return rotating_file +class _SQLAlchemyHandler(logging.Handler): + + def __init__(self, session, engine, **kwargs): + logging.Handler.__init__(self, **kwargs) + self._session = session + self._engine = engine + + # Cyclic dependency + from aria.storage.modeling.model import Log + self._cls = Log + + def emit(self, record): + # pylint fails to recognize that this class does indeed have __table__ + self._cls.__table__.create(bind=self._engine, checkfirst=True) # pylint: disable=no-member + created_at = datetime.strptime(logging.Formatter('%(asctime)s').formatTime(record), + '%Y-%m-%d %H:%M:%S,%f') + log = self._cls( + prefix=record.prefix, + logger=record.name, + level=record.levelname, + msg=record.msg, + created_at=created_at, + ) + self._session.add(log) + + try: + self._session.commit() + except BaseException: + self._session.rollback() + raise + + _default_file_formatter = logging.Formatter( '%(asctime)s [%(name)s:%(levelname)s] %(message)s <%(pathname)s:%(lineno)d>') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 37482cf..902598e 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -15,19 +15,33 @@ """ A common context for both workflow and operation """ +from contextlib import contextmanager +from functools import partial from uuid import uuid4 +import logging import jinja2 -from aria import logger +from aria import logger as aria_logger from aria.storage import exceptions -class BaseContext(logger.LoggerMixin): +class BaseContext(aria_logger.LoggerMixin): """ Base context object for workflow and operation """ + class PrefixedLogger(object): + def __init__(self, logger, prefix=''): + self._logger = logger + self._prefix = prefix + + def __getattr__(self, item): + if item.upper() in logging._levelNames: + return partial(getattr(self._logger, item), extra={'prefix': self._prefix}) + else: + return getattr(self._logger, item) + def __init__( self, name, @@ -42,14 +56,38 @@ class BaseContext(logger.LoggerMixin): self._model = model_storage self._resource = resource_storage self._service_instance_id = service_instance_id + # self._logger = self._init_logger(ctx_logger) self._workdir = workdir + def _get_sqla_handler(self): + api_kwargs = self._model._initiator(**self._model._initiator_kwargs) + api_kwargs.update(**self._model._api_kwargs) + return aria_logger.create_sqla_log_handler(**api_kwargs) + def __repr__(self): return ( '{name}(name={self.name}, ' 'deployment_id={self._service_instance_id}, ' .format(name=self.__class__.__name__, self=self)) + @contextmanager + def self_logging(self, handlers=None): + handlers = set(handlers) if handlers else set() + try: + handlers.add(aria_logger.create_console_log_handler()) + handlers.add(self._get_sqla_handler()) + for handler in handlers: + self.logger.addHandler(handler) + self.logger = self.PrefixedLogger(self.logger, self.logging_id) + yield self.logger + finally: + for handler in handlers: + self.logger.removeHandler(handler) + + @property + def logging_id(self): + raise NotImplementedError + @property def model(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index c5ac8f0..97a09aa 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -46,12 +46,16 @@ class BaseOperationContext(BaseContext): self._task = None def __repr__(self): - details = 'operation_mapping={task.operation_mapping}; ' \ + details = 'implementation={task.implementation}; ' \ 'operation_inputs={task.inputs}'\ .format(task=self.task) return '{name}({0})'.format(details, name=self.name) @property + def logging_id(self): + raise NotImplementedError + + @property def task(self): """ The task in the model storage @@ -105,6 +109,11 @@ class NodeOperationContext(BaseOperationContext): """ Context for node based operations. """ + + @property + def logging_id(self): + return self.node.name or self.node.id + @property def node_template(self): """ @@ -126,6 +135,12 @@ class RelationshipOperationContext(BaseOperationContext): """ Context for relationship based operations. """ + + @property + def logging_id(self): + return '{0}:{1}'.format(self.source_node.name or self.source_node.id, + self.target_node.name or self.target_node.id) + @property def source_node_template(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index 00ed974..63ece3a 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -65,6 +65,10 @@ class WorkflowContext(BaseContext): return execution.id @property + def logging_id(self): + return self._workflow_name + + @property def execution(self): """ The execution model http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/decorators.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py index bbe43f4..f915813 100644 --- a/aria/orchestrator/decorators.py +++ b/aria/orchestrator/decorators.py @@ -54,12 +54,16 @@ def workflow(func=None, suffix_template=''): return _wrapper -def operation(func=None, toolbelt=False, suffix_template=''): +def operation(func=None, toolbelt=False, suffix_template='', logging_handlers=None): """ Operation decorator """ + if func is None: - return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt) + return partial(operation, + suffix_template=suffix_template, + toolbelt=toolbelt, + logging_handlers=logging_handlers) @wraps(func) def _wrapper(**func_kwargs): @@ -67,7 +71,9 @@ def operation(func=None, toolbelt=False, suffix_template=''): operation_toolbelt = context.toolbelt(func_kwargs['ctx']) func_kwargs.setdefault('toolbelt', operation_toolbelt) validate_function_arguments(func, func_kwargs) - return func(**func_kwargs) + + with func_kwargs['ctx'].self_logging(handlers=logging_handlers): + return func(**func_kwargs) return _wrapper http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py index 8927de9..bb92d1c 100644 --- a/aria/orchestrator/runner.py +++ b/aria/orchestrator/runner.py @@ -33,9 +33,6 @@ from .. import ( ) -SQLITE_IN_MEMORY = 'sqlite:///:memory:' - - class Runner(object): """ Runs workflows on a deployment. By default uses temporary storage (either on disk or in memory) @@ -97,7 +94,6 @@ class Runner(object): task_max_attempts=1, task_retry_interval=1) - def cleanup(self): if (self._is_storage_temporary and (self._storage_path is not None) and os.path.isfile(self._storage_path)): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index d0e1363..e9a6d94 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -23,7 +23,6 @@ from functools import ( wraps, ) -from aria import logger from aria.storage.modeling import model from aria.orchestrator.context import operation as operation_context @@ -42,7 +41,7 @@ def _locked(func=None): return _wrapper -class BaseTask(logger.LoggerMixin): +class BaseTask(object): """ Base class for Task objects """ @@ -151,6 +150,14 @@ class OperationTask(BaseTask): self._task_id = operation_task.id self._update_fields = None + @property + def self_logging(self): + return self._ctx.self_logging + + @property + def logger(self): + return self._ctx.logger + @contextmanager def _update(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 75bbbce..991fec6 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -200,7 +200,9 @@ class ProcessExecutor(base.BaseExecutor): request_handler = self._request_handlers.get(request_type) if not request_handler: raise RuntimeError('Invalid request type: {0}'.format(request_type)) - request_handler(task_id=request['task_id'], request=request, response=response) + task_id = request['task_id'] + # with self._tasks[task_id].self_logging(): + request_handler(task_id=task_id, request=request, response=response) except BaseException as e: self.logger.debug('Error in process executor listener: {0}'.format(e)) @@ -251,23 +253,35 @@ class ProcessExecutor(base.BaseExecutor): def _send_message(connection, message): + + # Packing the length of the entire msg using struct.pack. + # This enables later reading of the content. + def _pack(data): + return struct.pack(_INT_FMT, len(data)) + data = jsonpickle.dumps(message) - connection.send(struct.pack(_INT_FMT, len(data))) + msg_metadata = _pack(data) + connection.send(msg_metadata) connection.sendall(data) def _recv_message(connection): - message_len, = struct.unpack(_INT_FMT, _recv_bytes(connection, _INT_SIZE)) - return jsonpickle.loads(_recv_bytes(connection, message_len)) + # Retrieving the length of the msg to come. + def _unpack(conn): + return struct.unpack(_INT_FMT, _recv_bytes(conn, _INT_SIZE, wait=True))[0] + + msg_metadata_len = _unpack(connection) + msg = _recv_bytes(connection, msg_metadata_len) + return jsonpickle.loads(msg) -def _recv_bytes(connection, count): +def _recv_bytes(connection, count, wait=False): result = io.BytesIO() while True: if not count: return result.getvalue() read = connection.recv(count) - if not read: + if not wait and not read: return result.getvalue() result.write(read) count -= len(read) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 7ae0217..6c59986 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -63,5 +63,5 @@ class ThreadExecutor(BaseExecutor): except BaseException as e: self._task_failed(task, exception=e) # Daemon threads - except BaseException: + except BaseException as e: pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index eaadc7e..45af1be 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -42,12 +42,6 @@ from .core import ( ModelStorage, ResourceStorage, ) -from .modeling import ( - structure, - model, - model_base, - type -) from . import ( exceptions, api, @@ -58,14 +52,10 @@ from . import ( __all__ = ( 'exceptions', - 'structure', 'Storage', 'ModelStorage', 'ResourceStorage', 'filesystem_rapi', 'sql_mapi', 'api', - 'model', - 'model_base', - 'type', ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/core.py ---------------------------------------------------------------------- diff --git a/aria/storage/core.py b/aria/storage/core.py index 0e189e6..7d70070 100644 --- a/aria/storage/core.py +++ b/aria/storage/core.py @@ -84,6 +84,12 @@ class Storage(LoggerMixin): self.logger.debug('{name} object is ready: {0!r}'.format( self, name=self.__class__.__name__)) + @property + def all_api_kwargs(self): + kwargs = self._api_kwargs.copy() + kwargs.update(self._additional_api_kwargs) + return kwargs + def __repr__(self): return '{name}(api={self.api})'.format(name=self.__class__.__name__, self=self) @@ -121,9 +127,7 @@ class ResourceStorage(Storage): :param name: :return: """ - kwargs = self._api_kwargs.copy() - kwargs.update(self._additional_api_kwargs) - self.registered[name] = self.api(name=name, **kwargs) + self.registered[name] = self.api(name=name, **self.all_api_kwargs) self.registered[name].create() self.logger.debug('setup {name} in storage {self!r}'.format(name=name, self=self)) @@ -148,9 +152,9 @@ class ModelStorage(Storage): self.logger.debug('{name} in already storage {self!r}'.format(name=model_name, self=self)) return - kwargs = self._api_kwargs.copy() - kwargs.update(self._additional_api_kwargs) - self.registered[model_name] = self.api(name=model_name, model_cls=model_cls, **kwargs) + self.registered[model_name] = self.api(name=model_name, + model_cls=model_cls, + **self.all_api_kwargs) self.registered[model_name].create() self.logger.debug('setup {name} in storage {self!r}'.format(name=model_name, self=self)) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/modeling/model.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/model.py b/aria/storage/modeling/model.py index 62b90b3..cf7d933 100644 --- a/aria/storage/modeling/model.py +++ b/aria/storage/modeling/model.py @@ -216,4 +216,8 @@ class Plugin(aria_declarative_base, orchestrator_elements.PluginBase): class Task(aria_declarative_base, orchestrator_elements.TaskBase): pass + + +class Log(aria_declarative_base, orchestrator_elements.LogBase): + pass # endregion http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/modeling/orchestrator_elements.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/orchestrator_elements.py b/aria/storage/modeling/orchestrator_elements.py index 5f7a3f2..47fe49f 100644 --- a/aria/storage/modeling/orchestrator_elements.py +++ b/aria/storage/modeling/orchestrator_elements.py @@ -466,3 +466,18 @@ class TaskBase(ModelMixin): @staticmethod def retry(message=None, retry_interval=None): raise TaskRetryException(message, retry_interval=retry_interval) + + +class LogBase(ModelMixin): + __tablename__ = 'log' + + logger = Column(String) + level = Column(String) + msg = Column(String) + created_at = Column(DateTime, index=True) + prefix = Column(String) + description = Column(String) + + def __repr__(self): + return "{self.created_at}: [{self.level}] @{self.prefix} ->{msg}".format( + self=self, msg=self.msg[:50]) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage/sql_mapi.py ---------------------------------------------------------------------- diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py index 2711aee..8bbec54 100644 --- a/aria/storage/sql_mapi.py +++ b/aria/storage/sql_mapi.py @@ -145,6 +145,7 @@ class SQLAlchemyModelAPI(api.ModelAPI): def create(self, checkfirst=True, create_all=True, **kwargs): self.model_cls.__table__.create(self._engine, checkfirst=checkfirst) + if create_all: # In order to create any models created dynamically (e.g. many-to-many helper tables are # created at runtime). http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/aria/storage_initializer.py ---------------------------------------------------------------------- diff --git a/aria/storage_initializer.py b/aria/storage_initializer.py index 175ec22..8c154df 100644 --- a/aria/storage_initializer.py +++ b/aria/storage_initializer.py @@ -16,7 +16,7 @@ from datetime import datetime from threading import RLock -from .storage import model +from .storage.modeling import model from .orchestrator import operation from .utils.formatting import safe_repr from .utils.console import puts, Colored http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/mock/topology.py ---------------------------------------------------------------------- diff --git a/tests/mock/topology.py b/tests/mock/topology.py index b04fb46..d3e8b7b 100644 --- a/tests/mock/topology.py +++ b/tests/mock/topology.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from aria.storage import model +from aria.storage.modeling import model from . import models http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/context/test_operation.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py index 3f39979..c2f5fd0 100644 --- a/tests/orchestrator/context/test_operation.py +++ b/tests/orchestrator/context/test_operation.py @@ -15,16 +15,21 @@ import os +import logging +import tempfile + import pytest +from aria.orchestrator.workflows.executor import process, thread + from aria import ( workflow, operation, ) from aria.orchestrator import context from aria.orchestrator.workflows import api -from aria.orchestrator.workflows.executor import thread +import tests from tests import mock, storage from . import ( op_path, @@ -38,16 +43,27 @@ global_test_holder = {} @pytest.fixture def ctx(tmpdir): context = mock.context.simple( - str(tmpdir.join('workdir')), - inmemory=True, + str(tmpdir), context_kwargs=dict(workdir=str(tmpdir.join('workdir'))) ) yield context storage.release_sqlite_storage(context.model) +tmp_file = os.path.join(tempfile.gettempdir(), 'op_testing_const_file_name') + + +@pytest.fixture(autouse=True) +def tmpfile_cleanup(): + try: + yield + finally: + if os.path.isfile(tmp_file): + os.remove(tmp_file) + + @pytest.fixture -def executor(): +def thread_executor(): result = thread.ThreadExecutor() try: yield result @@ -55,13 +71,23 @@ def executor(): result.close() -def test_node_operation_task_execution(ctx, executor): +@pytest.fixture(params=[(thread.ThreadExecutor()), + (process.ProcessExecutor(python_path=tests.ROOT_DIR))]) +def executor(request): + ex = request.param + try: + yield ex + finally: + ex.close() + + +def test_node_operation_task_execution(ctx, thread_executor): operation_name = 'aria.interfaces.lifecycle.create' node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) interface = mock.models.get_interface( operation_name, - operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__)) + operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)) ) node.interfaces = [interface] ctx.model.node.update(node) @@ -77,7 +103,7 @@ def test_node_operation_task_execution(ctx, executor): ) ) - execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) operation_context = global_test_holder[op_name(node, operation_name)] @@ -96,13 +122,13 @@ def test_node_operation_task_execution(ctx, executor): assert operation_context.node == node -def test_relationship_operation_task_execution(ctx, executor): +def test_relationship_operation_task_execution(ctx, thread_executor): operation_name = 'aria.interfaces.relationship_lifecycle.post_configure' relationship = ctx.model.relationship.list()[0] interface = mock.models.get_interface( operation_name=operation_name, - operation_kwargs=dict(implementation=op_path(my_operation, module_path=__name__)), + operation_kwargs=dict(implementation=op_path(basic_operation, module_path=__name__)), edge='source' ) @@ -121,7 +147,7 @@ def test_relationship_operation_task_execution(ctx, executor): ) ) - execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) operation_context = global_test_holder[op_name(relationship, operation_name)] @@ -148,12 +174,12 @@ def test_relationship_operation_task_execution(ctx, executor): assert operation_context.source_node == dependent_node -def test_invalid_task_operation_id(ctx, executor): +def test_invalid_task_operation_id(ctx, thread_executor): """ Checks that the right id is used. The task created with id == 1, thus running the task on node_instance with id == 2. will check that indeed the node_instance uses the correct id. :param ctx: - :param executor: + :param thread_executor: :return: """ operation_name = 'aria.interfaces.lifecycle.create' @@ -174,14 +200,14 @@ def test_invalid_task_operation_id(ctx, executor): api.task.OperationTask.node(name=operation_name, instance=node) ) - execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) op_node_instance_id = global_test_holder[op_name(node, operation_name)] assert op_node_instance_id == node.id assert op_node_instance_id != other_node.id -def test_plugin_workdir(ctx, executor, tmpdir): +def test_plugin_workdir(ctx, thread_executor, tmpdir): op = 'test.op' plugin_name = 'mock_plugin' node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) @@ -203,15 +229,76 @@ def test_plugin_workdir(ctx, executor, tmpdir): graph.add_tasks(api.task.OperationTask.node( name=op, instance=node, inputs=inputs)) - execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=thread_executor) expected_file = tmpdir.join('workdir', 'plugins', str(ctx.service_instance.id), plugin_name, filename) assert expected_file.read() == content +def test_operation_logging(ctx, executor): + operation_name = 'aria.interfaces.lifecycle.create' + + node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME) + interface = mock.models.get_interface( + operation_name, + operation_kwargs=dict(implementation=op_path(logged_operation, module_path=__name__)) + ) + node.interfaces = [interface] + ctx.model.node.update(node) + + inputs = { + 'op_start': 'op_start', + 'op_end': 'op_end', + } + + @workflow + def basic_workflow(graph, **_): + graph.add_tasks( + api.task.OperationTask.node( + name=operation_name, + instance=node, + inputs=inputs + ) + ) + + execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor) + + logs = ctx.model.log.list() + + assert len(logs) == 2 + + op_start_log = [l for l in logs if inputs['op_start'] in l.msg and l.level.lower() == 'info'] + assert len(op_start_log) == 1 + op_start_log = op_start_log[0] + + op_end_log = [l for l in logs if inputs['op_end'] in l.msg and l.level.lower() == 'debug'] + assert len(op_end_log) == 1 + op_end_log = op_end_log[0] + + assert op_start_log.created_at < op_end_log.created_at + + with open(tmp_file, 'r') as f: + logs = [l.strip() for l in f.readlines()] + + assert inputs['op_start'] in logs + assert inputs['op_end'] in logs + + +class MockLogHandler(logging.Handler): + def emit(self, record): + with open(tmp_file, 'a+') as f: + f.write(record.msg + '\n') + + +@operation(logging_handlers=[MockLogHandler()]) +def logged_operation(ctx, **_): + ctx.logger.info(ctx.task.inputs['op_start']) + ctx.logger.debug(ctx.task.inputs['op_end']) + + @operation -def my_operation(ctx, **_): +def basic_operation(ctx, **_): global_test_holder[ctx.name] = ctx http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/context/test_serialize.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/context/test_serialize.py b/tests/orchestrator/context/test_serialize.py index 1fdcb1a..03f9529 100644 --- a/tests/orchestrator/context/test_serialize.py +++ b/tests/orchestrator/context/test_serialize.py @@ -58,7 +58,7 @@ def _mock_workflow(ctx, graph): def _mock_operation(ctx): # We test several things in this operation # ctx.task, ctx.node, etc... tell us that the model storage was properly re-created - # a correct ctx.task.operation_mapping tells us we kept the correct task_id + # a correct ctx.task.implementation tells us we kept the correct task_id assert ctx.task.implementation == _operation_mapping() # a correct ctx.node.name tells us we kept the correct actor_id assert ctx.node.name == mock.models.DEPENDENCY_NODE_INSTANCE_NAME http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/execution_plugin/test_common.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_common.py b/tests/orchestrator/execution_plugin/test_common.py index 1048c27..151b996 100644 --- a/tests/orchestrator/execution_plugin/test_common.py +++ b/tests/orchestrator/execution_plugin/test_common.py @@ -18,7 +18,7 @@ from collections import namedtuple import requests import pytest -from aria.storage import model +from aria.storage.modeling import model from aria.orchestrator import exceptions from aria.orchestrator.execution_plugin import common http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/execution_plugin/test_local.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py index 9e9540f..5224496 100644 --- a/tests/orchestrator/execution_plugin/test_local.py +++ b/tests/orchestrator/execution_plugin/test_local.py @@ -483,9 +483,6 @@ if __name__ == '__main__': operations.__name__, operations.run_script_locally.__name__)) )] - # node.operations[op] = { - # 'operation': '{0}.{1}'.format(operations.__name__, - # operations.run_script_locally.__name__)} graph.add_tasks(api.task.OperationTask.node( instance=node, name=op, http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/execution_plugin/test_ssh.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py index 2e270bb..cf4c7e1 100644 --- a/tests/orchestrator/execution_plugin/test_ssh.py +++ b/tests/orchestrator/execution_plugin/test_ssh.py @@ -24,7 +24,7 @@ import fabric.api from fabric.contrib import files from fabric import context_managers -from aria.storage import model +from aria.storage.modeling import model from aria.orchestrator import events from aria.orchestrator import workflow from aria.orchestrator.workflows import api @@ -413,6 +413,12 @@ class TestFabricEnvHideGroupsAndRunCommands(object): task.runs_on = Stub logger = logging.getLogger() + @staticmethod + @contextlib.contextmanager + def _mock_self_logging(*args, **kwargs): + yield + _Ctx.self_logging = _mock_self_logging + @pytest.fixture(autouse=True) def _setup(self, mocker): self.default_fabric_env = { http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/workflows/executor/test_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py index 580bf8b..64dfb66 100644 --- a/tests/orchestrator/workflows/executor/test_executor.py +++ b/tests/orchestrator/workflows/executor/test_executor.py @@ -83,7 +83,7 @@ class MockException(Exception): class MockContext(object): def __init__(self, *args, **kwargs): - pass + self.logger = logging.getLogger() def __getattr__(self, item): if item == 'serialization_dict': @@ -95,6 +95,10 @@ class MockContext(object): def deserialize_from_dict(cls, **kwargs): return cls() + @contextmanager + def self_logging(self): + yield self.logger + class MockTask(object): @@ -124,6 +128,10 @@ class MockTask(object): def _update(self): yield self + @contextmanager + def self_logging(self): + yield self.logger + @pytest.fixture(params=[ (thread.ThreadExecutor, {'pool_size': 1}), http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index e904eb3..1bdc1e4 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -22,10 +22,8 @@ from contextlib import contextmanager import pytest from aria import application_model_storage -from aria.storage import ( - model as aria_model, - sql_mapi -) +from aria.storage import sql_mapi +from aria.storage.modeling import model as aria_model from aria.orchestrator import ( events, plugin http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/storage/__init__.py ---------------------------------------------------------------------- diff --git a/tests/storage/__init__.py b/tests/storage/__init__.py index 4278831..5323d01 100644 --- a/tests/storage/__init__.py +++ b/tests/storage/__init__.py @@ -12,8 +12,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import os -import platform + from shutil import rmtree from tempfile import mkdtemp @@ -23,19 +22,19 @@ from sqlalchemy import ( Column, Text, Integer, - pool + pool, + MetaData ) -from aria.storage import ( +from aria.storage.modeling import ( model, - type as aria_type, structure, - modeling + type as aria_type ) -class MockModel(model.aria_declarative_base, structure.ModelMixin): #pylint: disable=abstract-method +class MockModel(structure.ModelMixin, model.aria_declarative_base): #pylint: disable=abstract-method __tablename__ = 'mock_model' model_dict = Column(aria_type.Dict) model_list = Column(aria_type.List) @@ -58,14 +57,8 @@ def release_sqlite_storage(storage): :param storage: :return: """ - mapis = storage.registered.values() - - if mapis: - for session in set(mapi._session for mapi in mapis): - session.rollback() - session.close() - for engine in set(mapi._engine for mapi in mapis): - model.aria_declarative_base.metadata.drop_all(engine) + storage.all_api_kwargs['session'].close() + MetaData(bind=storage.all_api_kwargs['engine']).drop_all() def init_inmemory_model_storage(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/storage/test_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py index 08d5ae0..7f0eb02 100644 --- a/tests/storage/test_instrumentation.py +++ b/tests/storage/test_instrumentation.py @@ -17,13 +17,15 @@ import pytest from sqlalchemy import Column, Text, Integer, event from aria.storage import ( - structure, ModelStorage, sql_mapi, instrumentation, exceptions, +) +from aria.storage.modeling import ( + model, type as aria_type, - model + structure, ) from ..storage import release_sqlite_storage, init_inmemory_model_storage http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/storage/test_structures.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_structures.py b/tests/storage/test_structures.py index 666256e..27e99d7 100644 --- a/tests/storage/test_structures.py +++ b/tests/storage/test_structures.py @@ -21,9 +21,9 @@ from aria.storage import ( ModelStorage, sql_mapi, exceptions, - type, modeling, ) +from aria.storage.modeling import type from ..storage import release_sqlite_storage, structure, init_inmemory_model_storage from . import MockModel http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/91496db5/tests/test_logger.py ---------------------------------------------------------------------- diff --git a/tests/test_logger.py b/tests/test_logger.py index 70f08bb..1ad055c 100644 --- a/tests/test_logger.py +++ b/tests/test_logger.py @@ -53,7 +53,8 @@ def test_create_console_log_handler(capsys): logger.info(info_test_string) logger.debug(debug_test_string) _, err = capsys.readouterr() - assert err.count('DEBUG: {test_string}'.format(test_string=debug_test_string)) == 1 + + assert err.count('[DEBUG] {test_string}'.format(test_string=debug_test_string)) assert err.count(info_test_string) == 1 # Custom handler