Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 40F05200C04 for ; Tue, 24 Jan 2017 23:49:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 3F98E160B3E; Tue, 24 Jan 2017 22:49:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 3E07B160B38 for ; Tue, 24 Jan 2017 23:49:25 +0100 (CET) Received: (qmail 27446 invoked by uid 500); 24 Jan 2017 22:49:24 -0000 Mailing-List: contact dev-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list dev@ariatosca.incubator.apache.org Received: (qmail 27433 invoked by uid 99); 24 Jan 2017 22:49:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2017 22:49:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 0A084C0D33 for ; Tue, 24 Jan 2017 22:49:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.218 X-Spam-Level: X-Spam-Status: No, score=-6.218 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999, URIBL_BLOCKED=0.001] autolearn=disabled Received: from mx1-lw-us.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id 3EzbbwzA4ye3 for ; Tue, 24 Jan 2017 22:49:21 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-us.apache.org (ASF Mail Server at mx1-lw-us.apache.org) with SMTP id 5B0BA5F3BF for ; Tue, 24 Jan 2017 22:49:21 +0000 (UTC) Received: (qmail 26952 invoked by uid 99); 24 Jan 2017 22:49:01 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 24 Jan 2017 22:49:01 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8F6B2DFBAD; Tue, 24 Jan 2017 22:49:01 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: dankilman@apache.org To: dev@ariatosca.incubator.apache.org Message-Id: <0b826ef20c46498e96cc435da67f7abe@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-ariatosca git commit: ARIA-68 Update runtime properties during operation [Forced Update!] Date: Tue, 24 Jan 2017 22:49:01 +0000 (UTC) archived-at: Tue, 24 Jan 2017 22:49:26 -0000 Repository: incubator-ariatosca Updated Branches: refs/heads/ARIA-68-commit-runtime-properties 201ef1234 -> bd152bcb8 (forced update) ARIA-68 Update runtime properties during operation Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/bd152bcb Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/bd152bcb Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/bd152bcb Branch: refs/heads/ARIA-68-commit-runtime-properties Commit: bd152bcb8b709737aac30ef85bf7dddf6ae1d9e1 Parents: 0382b22 Author: Dan Kilman Authored: Tue Jan 24 16:58:10 2017 +0200 Committer: Dan Kilman Committed: Wed Jan 25 00:47:11 2017 +0200 ---------------------------------------------------------------------- aria/orchestrator/workflows/executor/process.py | 83 +++++++++++++------- aria/storage/instrumentation.py | 18 ++++- tests/storage/test_instrumentation.py | 38 +++++++++ 3 files changed, 109 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bd152bcb/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 9fa0302..0667e7c 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -29,6 +29,7 @@ script_dir = os.path.dirname(__file__) if script_dir in sys.path: sys.path.remove(script_dir) +import contextlib import io import threading import socket @@ -136,37 +137,39 @@ class ProcessExecutor(base.BaseExecutor): while not self._stopped: try: # Accept messages written to the server socket - message = self._recv_message() - message_type = message['type'] - if message_type == 'closed': - break - task_id = message['task_id'] - if message_type == 'started': - self._task_started(self._tasks[task_id]) - elif message_type == 'succeeded': - task = self._remove_task(task_id) - instrumentation.apply_tracked_changes( - tracked_changes=message['tracked_changes'], - model=task.context.model) - self._task_succeeded(task) - elif message_type == 'failed': - task = self._remove_task(task_id) - instrumentation.apply_tracked_changes( - tracked_changes=message['tracked_changes'], - model=task.context.model) - self._task_failed(task, exception=message['exception']) - else: - raise RuntimeError('Invalid state') + with contextlib.closing(self._server_socket.accept()[0]) as connection: + message = self._recv_message(connection) + message_type = message['type'] + if message_type == 'closed': + break + task_id = message['task_id'] + if message_type == 'started': + self._task_started(self._tasks[task_id]) + elif message_type == 'apply_tracked_changes': + task = self._tasks[task_id] + instrumentation.apply_tracked_changes( + tracked_changes=message['tracked_changes'], + model=task.context.model) + elif message_type == 'succeeded': + task = self._remove_task(task_id) + instrumentation.apply_tracked_changes( + tracked_changes=message['tracked_changes'], + model=task.context.model) + self._task_succeeded(task) + elif message_type == 'failed': + task = self._remove_task(task_id) + instrumentation.apply_tracked_changes( + tracked_changes=message['tracked_changes'], + model=task.context.model) + self._task_failed(task, exception=message['exception']) + else: + raise RuntimeError('Invalid state') except BaseException as e: self.logger.debug('Error in process executor listener: {0}'.format(e)) - def _recv_message(self): - connection, _ = self._server_socket.accept() - try: - message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE)) - return jsonpickle.loads(self._recv_bytes(connection, message_len)) - finally: - connection.close() + def _recv_message(self, connection): + message_len, = struct.unpack(_INT_FMT, self._recv_bytes(connection, _INT_SIZE)) + return jsonpickle.loads(self._recv_bytes(connection, message_len)) @staticmethod def _recv_bytes(connection, count): @@ -247,6 +250,9 @@ class _Messenger(object): """Task failed message""" self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception) + def apply_tracked_changes(self, tracked_changes): + self._send_message(type='apply_tracked_changes', tracked_changes=tracked_changes) + def closed(self): """Executor closed message""" self._send_message(type='closed') @@ -263,10 +269,27 @@ class _Messenger(object): }) sock.send(struct.pack(_INT_FMT, len(data))) sock.sendall(data) + sock.recv(1) finally: sock.close() +def _patch_session(session, messenger, instrument): + original_refresh = session.refresh + + def patched_refresh(target): + instrument.clear(target) + original_refresh(target) + + def patched_commit(): + messenger.apply_tracked_changes(instrument.tracked_changes) + instrument.clear() + + session.autoflush = False + session.commit = patched_commit + session.refresh = patched_refresh + + def _main(): arguments_json_path = sys.argv[1] with open(arguments_json_path) as f: @@ -292,6 +315,10 @@ def _main(): with instrumentation.track_changes() as instrument: try: ctx = serialization.operation_context_from_dict(context_dict) + _patch_session( + session=ctx.model.node_instance._session, + messenger=messenger, + instrument=instrument) task_func = imports.load_attribute(operation_mapping) aria.install_aria_extensions() for decorate in process_executor.decorate(): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bd152bcb/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index d71190d..537dbb5 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -75,7 +75,7 @@ class _Instrumentation(object): def _register_set_attribute_listener(self, instrumented_attribute, attribute_type): def listener(target, value, *_): - mapi_name = api.generate_lower_name(target.__class__) + mapi_name = self._mapi_name(target.__class__) tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) tracked_attributes = tracked_instances.setdefault(target.id, {}) if value is None: @@ -90,7 +90,7 @@ class _Instrumentation(object): def _register_instance_listeners(self, instrumented_class, instrumented_attributes): def listener(target, *_): - mapi_name = api.generate_lower_name(instrumented_class) + mapi_name = self._mapi_name(instrumented_class) tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) tracked_attributes = tracked_instances.setdefault(target.id, {}) for attribute_name, attribute_type in instrumented_attributes.items(): @@ -108,6 +108,14 @@ class _Instrumentation(object): sqlalchemy.event.listen(*listener_args) self.listeners.append(listener_args) + def clear(self, target=None): + if target: + mapi_name = self._mapi_name(target.__class__) + tracked_instances = self.tracked_changes.setdefault(mapi_name, {}) + tracked_instances.pop(target.id, None) + else: + self.tracked_changes.clear() + def restore(self): """Remove all listeners registered by this instrumentation""" for listener_args in self.listeners: @@ -120,6 +128,10 @@ class _Instrumentation(object): def __exit__(self, exc_type, exc_val, exc_tb): self.restore() + @staticmethod + def _mapi_name(instrumented_class): + return api.generate_lower_name(instrumented_class) + class _Value(object): # You may wonder why is this a full blown class and not a named tuple. The reason is that @@ -155,3 +167,5 @@ def apply_tracked_changes(tracked_changes, model): if not instance: instance = mapi.get(instance_id) setattr(instance, attribute_name, value.current) + if instance: + mapi.update(instance) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/bd152bcb/tests/storage/test_instrumentation.py ---------------------------------------------------------------------- diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py index 8e7f9aa..8b826e9 100644 --- a/tests/storage/test_instrumentation.py +++ b/tests/storage/test_instrumentation.py @@ -232,6 +232,44 @@ class TestInstrumentation(object): assert instance2_1.dict1 == {'initial': 'value', 'new': 'value'} assert instance2_2.list1 == ['initial', 'new_value'] + def test_clear_instance(self, storage): + instance1 = MockModel1(name='name1') + instance2 = MockModel1(name='name2') + for instance in [instance1, instance2]: + storage.mock_model_1.put(instance) + instrument = self._track_changes({MockModel1.dict1: dict}) + instance1.dict1 = {'new': 'value'} + instance2.dict1 = {'new2': 'value2'} + assert instrument.tracked_changes == { + 'mock_model_1': { + instance1.id: {'dict1': Value(STUB, {'new': 'value'})}, + instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})} + } + } + instrument.clear(instance1) + assert instrument.tracked_changes == { + 'mock_model_1': { + instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})} + } + } + + def test_clear_all(self, storage): + instance1 = MockModel1(name='name1') + instance2 = MockModel1(name='name2') + for instance in [instance1, instance2]: + storage.mock_model_1.put(instance) + instrument = self._track_changes({MockModel1.dict1: dict}) + instance1.dict1 = {'new': 'value'} + instance2.dict1 = {'new2': 'value2'} + assert instrument.tracked_changes == { + 'mock_model_1': { + instance1.id: {'dict1': Value(STUB, {'new': 'value'})}, + instance2.id: {'dict1': Value(STUB, {'new2': 'value2'})} + } + } + instrument.clear() + assert instrument.tracked_changes == {} + def _track_changes(self, instrumented): instrument = instrumentation.track_changes(instrumented) instruments_holder.append(instrument)