ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: NullPool logging messages appear during execution [Forced Update!]
Date Wed, 24 May 2017 09:15:18 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution 833d31b3c -> 0c986842d
(forced update)


NullPool logging messages appear during execution


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0c986842
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0c986842
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0c986842

Branch: refs/heads/ARIA-185-NullPool-logging-messages-appear-during-execution
Commit: 0c986842d52eca823ab92442dd9d77267e369ae8
Parents: 3d22d36
Author: max-orlov <maxim@gigaspaces.com>
Authored: Mon May 22 18:28:12 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Wed May 24 12:15:13 2017 +0300

----------------------------------------------------------------------
 aria/orchestrator/context/common.py             |  1 -
 aria/orchestrator/context/operation.py          | 13 +++++--
 .../execution_plugin/ctx_proxy/client.py        | 22 ++++++------
 .../execution_plugin/ctx_proxy/server.py        | 37 +++++++++++++-------
 aria/orchestrator/workflows/executor/process.py |  2 +-
 .../execution_plugin/test_ctx_proxy_server.py   |  4 ++-
 .../orchestrator/workflows/executor/__init__.py |  2 +-
 7 files changed, 52 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 0854a27..c98e026 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -16,7 +16,6 @@
 """
 A common context for both workflow and operation
 """
-
 import logging
 from contextlib import contextmanager
 from functools import partial

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 68a02aa..0ce790f 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -33,6 +33,7 @@ class BaseOperationContext(BaseContext):
         self._task_id = task_id
         self._actor_id = actor_id
         self._thread_local = threading.local()
+        self._destroy_session = kwargs.pop('destroy_session', False)
         logger_level = kwargs.pop('logger_level', None)
         super(BaseOperationContext, self).__init__(**kwargs)
         self._register_logger(task_id=self.task.id, level=logger_level)
@@ -90,13 +91,21 @@ class BaseOperationContext(BaseContext):
         }
 
     @classmethod
-    def deserialize_from_dict(cls, model_storage=None, resource_storage=None, **kwargs):
+    def instantiate_from_dict(cls, model_storage=None, resource_storage=None, **kwargs):
         if model_storage:
             model_storage = aria.application_model_storage(**model_storage)
         if resource_storage:
             resource_storage = aria.application_resource_storage(**resource_storage)
 
-        return cls(model_storage=model_storage, resource_storage=resource_storage, **kwargs)
+        return cls(model_storage=model_storage,
+                   resource_storage=resource_storage,
+                   destroy_session=True,
+                   **kwargs)
+
+    def close(self):
+        if self._destroy_session:
+            self.model.log._session.remove()
+            self.model.log._engine.dispose()
 
 
 class NodeOperationContext(BaseOperationContext):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/execution_plugin/ctx_proxy/client.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/client.py b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
index d965a5e..f7f56aa 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/client.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/client.py
@@ -34,22 +34,25 @@ class _RequestError(RuntimeError):
         self.ex_traceback = ex_traceback
 
 
-def _http_request(socket_url, request, timeout):
-    response = urllib2.urlopen(
-        url=socket_url,
-        data=json.dumps(request),
-        timeout=timeout)
+def _http_request(socket_url, request, method, timeout):
+    opener = urllib2.build_opener(urllib2.HTTPHandler)
+    request = urllib2.Request(socket_url, data=json.dumps(request))
+    request.get_method = lambda: method
+    response = opener.open(request, timeout=timeout)
+
     if response.code != 200:
         raise RuntimeError('Request failed: {0}'.format(response))
     return json.loads(response.read())
 
 
-def _client_request(socket_url, args, timeout):
+def _client_request(socket_url, args, timeout, method='POST'):
     response = _http_request(
         socket_url=socket_url,
         request={'args': args},
-        timeout=timeout)
-    payload = response['payload']
+        method=method,
+        timeout=timeout
+    )
+    payload = response.get('payload')
     response_type = response.get('type')
     if response_type == 'error':
         ex_type = payload['type']
@@ -89,7 +92,7 @@ def _process_args(json_prefix, args):
 def main(args=None):
     args = _parse_args(args)
     response = _client_request(
-        socket_url=args.socket_url,
+        args.socket_url,
         args=_process_args(args.json_arg_prefix, args.args),
         timeout=args.timeout)
     if args.json_output:
@@ -100,6 +103,5 @@ def main(args=None):
         response = str(response)
     sys.stdout.write(response)
 
-
 if __name__ == '__main__':
     main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 52a5312..1ce0e08 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -42,16 +42,31 @@ class CtxProxy(object):
         self._started.get(timeout=5)
 
     def _start_server(self):
-        proxy = self
 
         class BottleServerAdapter(bottle.ServerAdapter):
+            proxy = self
+
+            def close_session(self):
+                self.proxy.ctx.model.log._session.remove()
+
             def run(self, app):
+
                 class Server(wsgiref.simple_server.WSGIServer):
                     allow_reuse_address = True
+                    bottle_server = self
 
                     def handle_error(self, request, client_address):
                         pass
 
+                    def serve_forever(self, poll_interval=0.5):
+                        try:
+                            wsgiref.simple_server.WSGIServer.serve_forever(self, poll_interval)
+                        finally:
+                            # Once shutdown is called, we need to close the session.
+                            # If the session is not closed properly, it might raise warnings,
+                            # or even lock the database.
+                            self.bottle_server.close_session()
+
                 class Handler(wsgiref.simple_server.WSGIRequestHandler):
                     def address_string(self):
                         return self.client_address[0]
@@ -66,8 +81,8 @@ class CtxProxy(object):
                     app=app,
                     server_class=Server,
                     handler_class=Handler)
-                proxy.server = server
-                proxy._started.put(True)
+                self.proxy.server = server
+                self.proxy._started.put(True)
                 server.serve_forever(poll_interval=0.1)
 
         def serve():
@@ -96,9 +111,10 @@ class CtxProxy(object):
         request = bottle.request.body.read()  # pylint: disable=no-member
         response = self._process(request)
         return bottle.LocalResponse(
-            body=response,
+            body=json.dumps(response, cls=modeling.utils.ModelJSONEncoder),
             status=200,
-            headers={'content-type': 'application/json'})
+            headers={'content-type': 'application/json'}
+        )
 
     def _process(self, request):
         try:
@@ -109,10 +125,7 @@ class CtxProxy(object):
             if isinstance(payload, exceptions.ScriptException):
                 payload = dict(message=str(payload))
                 result_type = 'stop_operation'
-            result = json.dumps({
-                'type': result_type,
-                'payload': payload
-            }, cls=modeling.utils.ModelJSONEncoder)
+            result = {'type': result_type, 'payload': payload}
         except Exception as e:
             traceback_out = StringIO.StringIO()
             traceback.print_exc(file=traceback_out)
@@ -121,10 +134,8 @@ class CtxProxy(object):
                 'message': str(e),
                 'traceback': traceback_out.getvalue()
             }
-            result = json.dumps({
-                'type': 'error',
-                'payload': payload
-            })
+            result = {'type': 'error', 'payload': payload}
+
         return result
 
     def __enter__(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 824c4e1..da6bbb2 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -373,7 +373,7 @@ def _main():
     # See docstring of `remove_mutable_association_listener` for further details
     modeling_types.remove_mutable_association_listener()
     try:
-        ctx = context_dict['context_cls'].deserialize_from_dict(**context_dict['context'])
+        ctx = context_dict['context_cls'].instantiate_from_dict(**context_dict['context'])
     except BaseException as e:
         messenger.failed(exception=e, tracked_changes=None, new_instances=None)
         return

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
index 98ceff9..1b19fd9 100644
--- a/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
+++ b/tests/orchestrator/execution_plugin/test_ctx_proxy_server.py
@@ -136,7 +136,7 @@ class TestCtxProxy(object):
             kwargs=kwargs)
 
     @pytest.fixture
-    def ctx(self):
+    def ctx(self, mocker):
         class MockCtx(object):
             pass
         ctx = MockCtx()
@@ -160,11 +160,13 @@ class TestCtxProxy(object):
         ctx.stub_args = self.stub_args
         ctx.stub_attr = self.StubAttribute()
         ctx.node = self.NodeAttribute(properties)
+        ctx.model = mocker.MagicMock()
         return ctx
 
     @pytest.fixture
     def server(self, ctx):
         result = ctx_proxy.server.CtxProxy(ctx)
+        result._close_session = lambda *args, **kwargs: {}
         yield result
         result.close()
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0c986842/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 8ad8edb..375c44e 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -69,7 +69,7 @@ class MockContext(object):
         return None
 
     @classmethod
-    def deserialize_from_dict(cls, **kwargs):
+    def instantiate_from_dict(cls, **kwargs):
         if kwargs:
             return cls(storage=aria.application_model_storage(**kwargs))
         else:


Mime
View raw message