ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject incubator-ariatosca git commit: ARIA-68 Update runtime properties during operation
Date Tue, 24 Jan 2017 14:58:19 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-68-commit-runtime-properties [created] 5a7e5a205


ARIA-68 Update runtime properties during operation


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/5a7e5a20
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/5a7e5a20
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/5a7e5a20

Branch: refs/heads/ARIA-68-commit-runtime-properties
Commit: 5a7e5a20527805d4e95034a1e0cff6cb48402489
Parents: 0382b22
Author: Dan Kilman <dank@gigaspaces.com>
Authored: Tue Jan 24 16:58:10 2017 +0200
Committer: Dan Kilman <dank@gigaspaces.com>
Committed: Tue Jan 24 16:58:10 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py | 78 +++++++++++++-------
 aria/storage/instrumentation.py                 | 13 +++-
 2 files changed, 61 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a7e5a20/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 9fa0302..d293a06 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -29,6 +29,7 @@ script_dir = os.path.dirname(__file__)
 if script_dir in sys.path:
     sys.path.remove(script_dir)
 
+import contextlib
 import io
 import threading
 import socket
@@ -136,37 +137,39 @@ class ProcessExecutor(base.BaseExecutor):
         while not self._stopped:
             try:
                 # Accept messages written to the server socket
-                message = self._recv_message()
-                message_type = message['type']
-                if message_type == 'closed':
-                    break
-                task_id = message['task_id']
-                if message_type == 'started':
-                    self._task_started(self._tasks[task_id])
-                elif message_type == 'succeeded':
-                    task = self._remove_task(task_id)
-                    instrumentation.apply_tracked_changes(
-                        tracked_changes=message['tracked_changes'],
-                        model=task.context.model)
-                    self._task_succeeded(task)
-                elif message_type == 'failed':
-                    task = self._remove_task(task_id)
-                    instrumentation.apply_tracked_changes(
-                        tracked_changes=message['tracked_changes'],
-                        model=task.context.model)
-                    self._task_failed(task, exception=message['exception'])
-                else:
-                    raise RuntimeError('Invalid state')
+                with contextlib.closing(self._server_socket.accept()[0]) as connection:
+                    message = self._recv_message(connection)
+                    message_type = message['type']
+                    if message_type == 'closed':
+                        break
+                    task_id = message['task_id']
+                    if message_type == 'started':
+                        self._task_started(self._tasks[task_id])
+                    elif message_type == 'apply_tracked_changes':
+                        task = self._tasks[task_id]
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                    elif message_type == 'succeeded':
+                        task = self._remove_task(task_id)
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                        self._task_succeeded(task)
+                    elif message_type == 'failed':
+                        task = self._remove_task(task_id)
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                        self._task_failed(task, exception=message['exception'])
+                    else:
+                        raise RuntimeError('Invalid state')
             except BaseException as e:
                 self.logger.debug('Error in process executor listener: {0}'.format(e))
 
-    def _recv_message(self):
-        connection, _ = self._server_socket.accept()
-        try:
-            message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE))
-            return jsonpickle.loads(self._recv_bytes(connection, message_len))
-        finally:
-            connection.close()
+    def _recv_message(self, connection):
+        message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE))
+        return jsonpickle.loads(self._recv_bytes(connection, message_len))
 
     @staticmethod
     def _recv_bytes(connection, count):
@@ -247,6 +250,9 @@ class _Messenger(object):
         """Task failed message"""
         self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception)
 
+    def apply_tracked_changes(self, tracked_changes):
+        self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes)
+
     def closed(self):
         """Executor closed message"""
         self._send_message(type='closed')
@@ -263,6 +269,7 @@ class _Messenger(object):
             })
             sock.send(struct.pack(_INT_FMT, len(data)))
             sock.sendall(data)
+            sock.recv(1)
         finally:
             sock.close()
 
@@ -292,6 +299,21 @@ def _main():
     with instrumentation.track_changes() as instrument:
         try:
             ctx = serialization.operation_context_from_dict(context_dict)
+
+            session = ctx.model.node_instance._session
+            original_refresh = session.refresh
+
+            def patched_refresh(target):
+                instrument.clear(target)
+                original_refresh(target)
+
+            def patched_commit():
+                messenger.apply_tracked_changes(instrument.tracked_changes)
+
+            session.autoflush = False
+            session.commit = patched_commit
+            session.refresh = patched_refresh
+
             task_func = imports.load_attribute(operation_mapping)
             aria.install_aria_extensions()
             for decorate in process_executor.decorate():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/5a7e5a20/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index d71190d..40a1364 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -75,7 +75,7 @@ class _Instrumentation(object):
 
     def _register_set_attribute_listener(self, instrumented_attribute, attribute_type):
         def listener(target, value, *_):
-            mapi_name = api.generate_lower_name(target.__class__)
+            mapi_name = self._mapi_name(target.__class__)
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
             if value is None:
@@ -90,7 +90,7 @@ class _Instrumentation(object):
 
     def _register_instance_listeners(self, instrumented_class, instrumented_attributes):
         def listener(target, *_):
-            mapi_name = api.generate_lower_name(instrumented_class)
+            mapi_name = self._mapi_name(instrumented_class)
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
             for attribute_name, attribute_type in instrumented_attributes.items():
@@ -108,6 +108,11 @@ class _Instrumentation(object):
             sqlalchemy.event.listen(*listener_args)
             self.listeners.append(listener_args)
 
+    def clear(self, target):
+        mapi_name = self._mapi_name(target.__class__)
+        tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
+        tracked_instances.pop(target.id, None)
+
     def restore(self):
         """Remove all listeners registered by this instrumentation"""
         for listener_args in self.listeners:
@@ -120,6 +125,10 @@ class _Instrumentation(object):
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.restore()
 
+    @staticmethod
+    def _mapi_name(instrumented_class):
+        return api.generate_lower_name(instrumented_class)
+
 
 class _Value(object):
     # You may wonder why is this a full blown class and not a named tuple. The reason is
that


Mime
View raw message