ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject incubator-ariatosca git commit: ARIA-68 Update runtime properties during operation [Forced Update!]
Date Tue, 24 Jan 2017 22:49:01 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-68-commit-runtime-properties 201ef1234 -> bd152bcb8 (forced update)


ARIA-68 Update runtime properties during operation


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/bd152bcb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/bd152bcb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/bd152bcb

Branch: refs/heads/ARIA-68-commit-runtime-properties
Commit: bd152bcb8b709737aac30ef85bf7dddf6ae1d9e1
Parents: 0382b22
Author: Dan Kilman <dank@gigaspaces.com>
Authored: Tue Jan 24 16:58:10 2017 +0200
Committer: Dan Kilman <dank@gigaspaces.com>
Committed: Wed Jan 25 00:47:11 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py | 83 +++++++++++++-------
 aria/storage/instrumentation.py                 | 18 ++++-
 tests/storage/test_instrumentation.py           | 38 +++++++++
 3 files changed, 109 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bd152bcb/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index 9fa0302..0667e7c 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -29,6 +29,7 @@ script_dir = os.path.dirname(__file__)
 if script_dir in sys.path:
     sys.path.remove(script_dir)
 
+import contextlib
 import io
 import threading
 import socket
@@ -136,37 +137,39 @@ class ProcessExecutor(base.BaseExecutor):
         while not self._stopped:
             try:
                 # Accept messages written to the server socket
-                message = self._recv_message()
-                message_type = message['type']
-                if message_type == 'closed':
-                    break
-                task_id = message['task_id']
-                if message_type == 'started':
-                    self._task_started(self._tasks[task_id])
-                elif message_type == 'succeeded':
-                    task = self._remove_task(task_id)
-                    instrumentation.apply_tracked_changes(
-                        tracked_changes=message['tracked_changes'],
-                        model=task.context.model)
-                    self._task_succeeded(task)
-                elif message_type == 'failed':
-                    task = self._remove_task(task_id)
-                    instrumentation.apply_tracked_changes(
-                        tracked_changes=message['tracked_changes'],
-                        model=task.context.model)
-                    self._task_failed(task, exception=message['exception'])
-                else:
-                    raise RuntimeError('Invalid state')
+                with contextlib.closing(self._server_socket.accept()[0]) as connection:
+                    message = self._recv_message(connection)
+                    message_type = message['type']
+                    if message_type == 'closed':
+                        break
+                    task_id = message['task_id']
+                    if message_type == 'started':
+                        self._task_started(self._tasks[task_id])
+                    elif message_type == 'apply_tracked_changes':
+                        task = self._tasks[task_id]
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                    elif message_type == 'succeeded':
+                        task = self._remove_task(task_id)
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                        self._task_succeeded(task)
+                    elif message_type == 'failed':
+                        task = self._remove_task(task_id)
+                        instrumentation.apply_tracked_changes(
+                            tracked_changes=message['tracked_changes'],
+                            model=task.context.model)
+                        self._task_failed(task, exception=message['exception'])
+                    else:
+                        raise RuntimeError('Invalid state')
             except BaseException as e:
                 self.logger.debug('Error in process executor listener: {0}'.format(e))
 
-    def _recv_message(self):
-        connection, _ = self._server_socket.accept()
-        try:
-            message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE))
-            return jsonpickle.loads(self._recv_bytes(connection, message_len))
-        finally:
-            connection.close()
+    def _recv_message(self, connection):
+        message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE))
+        return jsonpickle.loads(self._recv_bytes(connection, message_len))
 
     @staticmethod
     def _recv_bytes(connection, count):
@@ -247,6 +250,9 @@ class _Messenger(object):
         """Task failed message"""
         self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception)
 
+    def apply_tracked_changes(self, tracked_changes):
+        self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes)
+
     def closed(self):
         """Executor closed message"""
         self._send_message(type='closed')
@@ -263,10 +269,27 @@ class _Messenger(object):
             })
             sock.send(struct.pack(_INT_FMT, len(data)))
             sock.sendall(data)
+            sock.recv(1)
         finally:
             sock.close()
 
 
+def _patch_session(session, messenger, instrument):
+    original_refresh = session.refresh
+
+    def patched_refresh(target):
+        instrument.clear(target)
+        original_refresh(target)
+
+    def patched_commit():
+        messenger.apply_tracked_changes(instrument.tracked_changes)
+        instrument.clear()
+
+    session.autoflush = False
+    session.commit = patched_commit
+    session.refresh = patched_refresh
+
+
 def _main():
     arguments_json_path = sys.argv[1]
     with open(arguments_json_path) as f:
@@ -292,6 +315,10 @@ def _main():
     with instrumentation.track_changes() as instrument:
         try:
             ctx = serialization.operation_context_from_dict(context_dict)
+            _patch_session(
+                session=ctx.model.node_instance._session,
+                messenger=messenger,
+                instrument=instrument)
             task_func = imports.load_attribute(operation_mapping)
             aria.install_aria_extensions()
             for decorate in process_executor.decorate():

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bd152bcb/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index d71190d..537dbb5 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -75,7 +75,7 @@ class _Instrumentation(object):
 
     def _register_set_attribute_listener(self, instrumented_attribute, attribute_type):
         def listener(target, value, *_):
-            mapi_name = api.generate_lower_name(target.__class__)
+            mapi_name = self._mapi_name(target.__class__)
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
             if value is None:
@@ -90,7 +90,7 @@ class _Instrumentation(object):
 
     def _register_instance_listeners(self, instrumented_class, instrumented_attributes):
         def listener(target, *_):
-            mapi_name = api.generate_lower_name(instrumented_class)
+            mapi_name = self._mapi_name(instrumented_class)
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
             tracked_attributes = tracked_instances.setdefault(target.id, {})
             for attribute_name, attribute_type in instrumented_attributes.items():
@@ -108,6 +108,14 @@ class _Instrumentation(object):
             sqlalchemy.event.listen(*listener_args)
             self.listeners.append(listener_args)
 
+    def clear(self, target=None):
+        if target:
+            mapi_name = self._mapi_name(target.__class__)
+            tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
+            tracked_instances.pop(target.id, None)
+        else:
+            self.tracked_changes.clear()
+
     def restore(self):
         """Remove all listeners registered by this instrumentation"""
         for listener_args in self.listeners:
@@ -120,6 +128,10 @@ class _Instrumentation(object):
     def __exit__(self, exc_type, exc_val, exc_tb):
         self.restore()
 
+    @staticmethod
+    def _mapi_name(instrumented_class):
+        return api.generate_lower_name(instrumented_class)
+
 
 class _Value(object):
     # You may wonder why is this a full blown class and not a named tuple. The reason is
that
@@ -155,3 +167,5 @@ def apply_tracked_changes(tracked_changes, model):
                     if not instance:
                         instance = mapi.get(instance_id)
                     setattr(instance, attribute_name, value.current)
+            if instance:
+                mapi.update(instance)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bd152bcb/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py
index 8e7f9aa..8b826e9 100644
--- a/tests/storage/test_instrumentation.py
+++ b/tests/storage/test_instrumentation.py
@@ -232,6 +232,44 @@ class TestInstrumentation(object):
         assert instance2_1.dict1 == {'initial': 'value', 'new': 'value'}
         assert instance2_2.list1 == ['initial', 'new_value']
 
+    def test_clear_instance(self, storage):
+        instance1 = MockModel1(name='name1')
+        instance2 = MockModel1(name='name2')
+        for instance in [instance1, instance2]:
+            storage.mock_model_1.put(instance)
+        instrument = self._track_changes({MockModel1.dict1: dict})
+        instance1.dict1 = {'new': 'value'}
+        instance2.dict1 = {'new2': 'value2'}
+        assert instrument.tracked_changes == {
+            'mock_model_1': {
+                instance1.id: {'dict1': Value(STUB, {'new': 'value'})},
+                instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})}
+            }
+        }
+        instrument.clear(instance1)
+        assert instrument.tracked_changes == {
+            'mock_model_1': {
+                instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})}
+            }
+        }
+
+    def test_clear_all(self, storage):
+        instance1 = MockModel1(name='name1')
+        instance2 = MockModel1(name='name2')
+        for instance in [instance1, instance2]:
+            storage.mock_model_1.put(instance)
+        instrument = self._track_changes({MockModel1.dict1: dict})
+        instance1.dict1 = {'new': 'value'}
+        instance2.dict1 = {'new2': 'value2'}
+        assert instrument.tracked_changes == {
+            'mock_model_1': {
+                instance1.id: {'dict1': Value(STUB, {'new': 'value'})},
+                instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})}
+            }
+        }
+        instrument.clear()
+        assert instrument.tracked_changes == {}
+
     def _track_changes(self, instrumented):
         instrument = instrumentation.track_changes(instrumented)
         instruments_holder.append(instrument)


Mime
View raw message