ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject incubator-ariatosca git commit: ARIA-63 Implement attribute tracking for subprocesses
Date Mon, 16 Jan 2017 15:17:45 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-63-runtime-properties-modification [created] 62cfb82d7


ARIA-63 Implement attribute tracking for subprocesses


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/62cfb82d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/62cfb82d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/62cfb82d

Branch: refs/heads/ARIA-63-runtime-properties-modification
Commit: 62cfb82d7c34853ecae1c49fb60e5145a2e26a27
Parents: dac4da7
Author: Dan Kilman <dank@gigaspaces.com>
Authored: Sun Jan 15 17:42:23 2017 +0200
Committer: Dan Kilman <dank@gigaspaces.com>
Committed: Mon Jan 16 17:15:46 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/workflows/executor/process.py |  49 ++--
 aria/storage/instrumentation.py                 | 135 +++++++++
 aria/storage/type.py                            |  20 +-
 .../test_process_executor_tracked_changes.py    |  95 +++++++
 tests/storage/test_instrumentation.py           | 274 +++++++++++++++++++
 5 files changed, 554 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62cfb82d/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index e0a8aeb..420cb85 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -42,6 +42,8 @@ import jsonpickle
 from aria.utils import imports
 from aria.orchestrator.workflows.executor import base
 from aria.orchestrator.context import serialization
+from aria.storage import instrumentation
+from aria.storage import type as storage_type
 
 _IS_WIN = os.name == 'nt'
 
@@ -139,10 +141,17 @@ class ProcessExecutor(base.BaseExecutor):
                 if message_type == 'started':
                     self._task_started(self._tasks[task_id])
                 elif message_type == 'succeeded':
-                    self._task_succeeded(self._remove_task(task_id))
+                    task = self._remove_task(task_id)
+                    instrumentation.apply_tracked_changes(
+                        tracked_changes=message['tracked_changes'],
+                        model=task.context.model)
+                    self._task_succeeded(task)
                 elif message_type == 'failed':
-                    self._task_failed(self._remove_task(task_id),
-                                      exception=message['exception'])
+                    task = self._remove_task(task_id)
+                    instrumentation.apply_tracked_changes(
+                        tracked_changes=message['tracked_changes'],
+                        model=task.context.model)
+                    self._task_failed(task, exception=message['exception'])
                 else:
                     raise RuntimeError('Invalid state')
             except BaseException as e:
@@ -227,26 +236,27 @@ class _Messenger(object):
         """Task started message"""
         self._send_message(type='started')
 
-    def succeeded(self):
+    def succeeded(self, tracked_changes):
         """Task succeeded message"""
-        self._send_message(type='succeeded')
+        self._send_message(type='succeeded', tracked_changes=tracked_changes)
 
-    def failed(self, exception):
+    def failed(self, tracked_changes, exception):
         """Task failed message"""
-        self._send_message(type='failed', exception=exception)
+        self._send_message(type='failed', tracked_changes=tracked_changes, exception=exception)
 
     def closed(self):
         """Executor closed message"""
         self._send_message(type='closed')
 
-    def _send_message(self, type, exception=None):
+    def _send_message(self, type, tracked_changes=None, exception=None):
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.connect(('localhost', self.port))
         try:
             data = jsonpickle.dumps({
                 'type': type,
                 'task_id': self.task_id,
-                'exception': exception
+                'exception': exception,
+                'tracked_changes': tracked_changes
             })
             sock.send(struct.pack(_INT_FMT, len(data)))
             sock.sendall(data)
@@ -271,13 +281,20 @@ def _main():
     operation_mapping = arguments['operation_mapping']
     operation_inputs = arguments['operation_inputs']
     context_dict = arguments['context']
-    try:
-        ctx = serialization.operation_context_from_dict(context_dict)
-        task_func = imports.load_attribute(operation_mapping)
-        task_func(ctx=ctx, **operation_inputs)
-        messenger.succeeded()
-    except BaseException as e:
-        messenger.failed(exception=e)
+
+    # This must happen before any model class is loaded, because that would trigger
+    # the listener we are trying to remove. Once it is triggered, many other listeners
+    # are will then be registered. At that point, it is too late.
+    storage_type.remove_mutable_association_listener()
+
+    with instrumentation.track_changes() as instrument:
+        try:
+            ctx = serialization.operation_context_from_dict(context_dict)
+            task_func = imports.load_attribute(operation_mapping)
+            task_func(ctx=ctx, **operation_inputs)
+            messenger.succeeded(tracked_changes=instrument.tracked_changes)
+        except BaseException as e:
+            messenger.failed(exception=e, tracked_changes=instrument.tracked_changes)
 
 if __name__ == '__main__':
     _main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62cfb82d/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
new file mode 100644
index 0000000..f8bedb3
--- /dev/null
+++ b/aria/storage/instrumentation.py
@@ -0,0 +1,135 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import copy
+
+import sqlalchemy.event
+
+from . import api
+from . import model
+
+_STUB = object()
+_INSTRUMENTED = {
+    model.NodeInstance.runtime_properties: dict
+}
+
+
+def track_changes(instrumented=None):
+    instrumented = instrumented or _INSTRUMENTED
+    instrumented_classes = {}
+    tracked_changes = {}
+    listeners = []
+    for instrumented_attribute, attribute_type in instrumented.items():
+        _register_set_attribute_listener(
+            listeners=listeners,
+            tracked_changes=tracked_changes,
+            instrumented_attribute=instrumented_attribute,
+            attribute_type=attribute_type)
+        instrumented_class = instrumented_attribute.parent.entity
+        instrumented_class_attributes = instrumented_classes.setdefault(instrumented_class,
{})
+        instrumented_class_attributes[instrumented_attribute.key] = attribute_type
+    for instrumented_class, instrumented_attributes in instrumented_classes.items():
+        _register_instance_listener(
+            listeners=listeners,
+            tracked_changes=tracked_changes,
+            instrumented_class=instrumented_class,
+            instrumented_attributes=instrumented_attributes)
+    return _Instrumentation(tracked_changes=tracked_changes, listeners=listeners)
+
+
+def _register_set_attribute_listener(listeners,
+                                     tracked_changes,
+                                     instrumented_attribute,
+                                     attribute_type):
+    def listener(target, value, *_):
+        mapi_name = api.generate_lower_name(target.__class__)
+        tracked_instances = tracked_changes.setdefault(mapi_name, {})
+        tracked_attributes = tracked_instances.setdefault(target.id, {})
+        if value is None:
+            current = None
+        else:
+            current = copy.deepcopy(attribute_type(value))
+        tracked_attributes[instrumented_attribute.key] = _Value(_STUB, current)
+        return current
+    listener_args = (instrumented_attribute, 'set', listener)
+    sqlalchemy.event.listen(*listener_args, retval=True)
+    listeners.append(listener_args)
+
+
+def _register_instance_listener(listeners,
+                                tracked_changes,
+                                instrumented_class,
+                                instrumented_attributes):
+    def listener(target, *_):
+        mapi_name = api.generate_lower_name(instrumented_class)
+        tracked_instances = tracked_changes.setdefault(mapi_name, {})
+        tracked_attributes = tracked_instances.setdefault(target.id, {})
+        for attribute_name, attribute_type in instrumented_attributes.items():
+            if attribute_name not in tracked_attributes:
+                initial = getattr(target, attribute_name)
+                if initial is None:
+                    current = None
+                else:
+                    current = copy.deepcopy(attribute_type(initial))
+                tracked_attributes[attribute_name] = _Value(initial, current)
+            target.__dict__[attribute_name] = tracked_attributes[attribute_name].current
+    for listener_args in [(instrumented_class, 'load',          listener),
+                          (instrumented_class, 'refresh',       listener),
+                          (instrumented_class, 'refresh_flush', listener)]:
+        sqlalchemy.event.listen(*listener_args)
+        listeners.append(listener_args)
+
+
+class _Instrumentation(object):
+
+    def __init__(self, tracked_changes, listeners):
+        self.tracked_changes = tracked_changes
+        self.listeners = listeners
+
+    def restore(self):
+        for listener_args in self.listeners:
+            sqlalchemy.event.remove(*listener_args)
+
+    def __enter__(self):
+        return self
+
+    def __exit__(self, exc_type, exc_val, exc_tb):
+        self.restore()
+
+
+class _Value(object):
+    def __init__(self, initial, current):
+        self.initial = initial
+        self.current = current
+
+    def __eq__(self, other):
+        if not isinstance(other, _Value):
+            return False
+        return self.initial == other.initial and self.current == other.current
+
+    def __hash__(self):
+        return hash(self.initial) ^ hash(self.current)
+
+
+def apply_tracked_changes(tracked_changes, model):
+    for mapi_name, tracked_instances in tracked_changes.items():
+        mapi = getattr(model, mapi_name)
+        for instance_id, tracked_attributes in tracked_instances.items():
+            instance = None
+            for attribute_name, value in tracked_attributes.items():
+                if value.initial != value.current:
+                    if not instance:
+                        instance = mapi.get(instance_id)
+                    setattr(instance, attribute_name, value.current)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62cfb82d/aria/storage/type.py
----------------------------------------------------------------------
diff --git a/aria/storage/type.py b/aria/storage/type.py
index ab50b0f..ec81b2c 100644
--- a/aria/storage/type.py
+++ b/aria/storage/type.py
@@ -16,7 +16,8 @@ import json
 
 from sqlalchemy import (
     TypeDecorator,
-    VARCHAR
+    VARCHAR,
+    event
 )
 
 from sqlalchemy.ext import mutable
@@ -84,5 +85,18 @@ class _MutableList(mutable.MutableList):
             raise exceptions.StorageError('SQL Storage error: {0}'.format(str(e)))
 
 
-_MutableList.associate_with(List)
-_MutableDict.associate_with(Dict)
+def _mutable_association_listener(mapper, cls):
+    for prop in mapper.column_attrs:
+        column_type = prop.columns[0].type
+        if isinstance(column_type, Dict):
+            _MutableDict.associate_with_attribute(getattr(cls, prop.key))
+        if isinstance(column_type, List):
+            _MutableList.associate_with_attribute(getattr(cls, prop.key))
+
+
+def remove_mutable_association_listener():
+    if event.contains(*_LISTENER_ARGS):
+        event.remove(*_LISTENER_ARGS)
+
+_LISTENER_ARGS = (mutable.mapper, 'mapper_configured', _mutable_association_listener)
+event.listen(*_LISTENER_ARGS)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62cfb82d/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
new file mode 100644
index 0000000..3e6d0c3
--- /dev/null
+++ b/tests/orchestrator/workflows/executor/test_process_executor_tracked_changes.py
@@ -0,0 +1,95 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.orchestrator.workflows import api
+from aria.orchestrator.workflows.core import engine
+from aria.orchestrator.workflows.executor import process
+from aria.orchestrator import workflow, operation
+from aria.orchestrator.workflows import exceptions
+
+
+from tests import mock
+from tests import storage
+
+
+_TEST_RUNTIME_PROPERTIES = {
+    'some': 'values', 'that': 'are', 'most': 'likely', 'only': 'set', 'here': 'yo'
+}
+
+
+def test_track_changes_of_successful_operation(context, executor):
+    _run_workflow(context=context, executor=executor, op_func=_mock_success_operation)
+    _assert_tracked_changes_are_applied(context)
+
+
+def test_track_changes_of_failed_operation(context, executor):
+    with pytest.raises(exceptions.ExecutorException):
+        _run_workflow(context=context, executor=executor, op_func=_mock_fail_operation)
+    _assert_tracked_changes_are_applied(context)
+
+
+def _assert_tracked_changes_are_applied(context):
+    instance = context.model.node_instance.get_by_name(mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+    assert instance.runtime_properties == _TEST_RUNTIME_PROPERTIES
+
+
+def _update_runtime_properties(context):
+    context.node_instance.runtime_properties.clear()
+    context.node_instance.runtime_properties.update(_TEST_RUNTIME_PROPERTIES)
+
+
+def _run_workflow(context, executor, op_func):
+    @workflow
+    def mock_workflow(ctx, graph):
+        node_instance = ctx.model.node_instance.get_by_name(
+            mock.models.DEPENDENCY_NODE_INSTANCE_NAME)
+        node_instance.node.operations['test.op'] = {'operation': _operation_mapping(op_func)}
+        task = api.task.OperationTask.node_instance(instance=node_instance, name='test.op')
+        graph.add_tasks(task)
+        return graph
+    graph = mock_workflow(ctx=context)
+    eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
+    eng.execute()
+
+
+@operation
+def _mock_success_operation(ctx):
+    _update_runtime_properties(ctx)
+
+
+@operation
+def _mock_fail_operation(ctx):
+    _update_runtime_properties(ctx)
+    raise RuntimeError
+
+
+def _operation_mapping(func):
+    return '{name}.{func.__name__}'.format(name=__name__, func=func)
+
+
+@pytest.fixture
+def executor():
+    result = process.ProcessExecutor()
+    yield result
+    result.close()
+
+
+@pytest.fixture
+def context(tmpdir):
+    result = mock.context.simple(storage.get_sqlite_api_kwargs(str(tmpdir)))
+    yield result
+    storage.release_sqlite_storage(result.model)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/62cfb82d/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py
new file mode 100644
index 0000000..a84b157
--- /dev/null
+++ b/tests/storage/test_instrumentation.py
@@ -0,0 +1,274 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import pytest
+from sqlalchemy import Column, Text, Integer, event
+
+from aria.storage import (
+    model,
+    structure,
+    type as aria_type,
+    ModelStorage,
+    sql_mapi,
+    instrumentation
+)
+from ..storage import get_sqlite_api_kwargs, release_sqlite_storage
+
+
+STUB = instrumentation._STUB
+Value = instrumentation._Value
+
+
+class TestInstrumentation(object):
+
+    def test_track_changes(self, storage):
+
+        model1_instance = MockModel1(
+            name='name',
+            dict1={'initial': 'value'},
+            dict2={'initial': 'value'},
+            list1=['initial'],
+            list2=['initial'],
+            int1=0,
+            int2=0,
+            string2='string')
+        model2_instance = MockModel2(
+            name='name',
+            dict1={'initial': 'value'},
+            dict2={'initial': 'value'},
+            list1=['initial'],
+            list2=['initial'],
+            int1=0,
+            int2=0,
+            string2='string')
+        storage.mock_model_1.put(model1_instance)
+        storage.mock_model_2.put(model2_instance)
+
+        instrument = instrumentation.track_changes(instrumented={
+            MockModel1.dict1: dict,
+            MockModel1.list1: list,
+            MockModel1.int1: int,
+            MockModel1.string2: str,
+            MockModel2.dict2: dict,
+            MockModel2.list2: list,
+            MockModel2.int2: int,
+            MockModel2.name: str
+        })
+
+        assert instrument.listeners
+        assert not instrument.tracked_changes
+
+        storage_model1_instance = storage.mock_model_1.get(model1_instance.id)
+        storage_model2_instance = storage.mock_model_2.get(model2_instance.id)
+
+        storage_model1_instance.dict1 = {'hello': 'world'}
+        storage_model1_instance.dict2 = {'should': 'not track'}
+        storage_model1_instance.list1 = ['hello']
+        storage_model1_instance.list2 = ['should not track']
+        storage_model1_instance.int1 = 100
+        storage_model1_instance.int2 = 20000
+        storage_model1_instance.name = 'should not track'
+        storage_model1_instance.string2 = 'new_string'
+
+        storage_model2_instance.dict1.update({'should': 'not track'})
+        storage_model2_instance.dict2.update({'hello': 'world'})
+        storage_model2_instance.list1.append('should not track')
+        storage_model2_instance.list2.append('hello')
+        storage_model2_instance.int1 = 100
+        storage_model2_instance.int2 = 20000
+        storage_model2_instance.name = 'new_name'
+        storage_model2_instance.string2 = 'should not track'
+
+        assert instrument.tracked_changes == {
+            'mock_model_1': {
+                model1_instance.id: {
+                    'dict1': Value(STUB, {'hello': 'world'}),
+                    'list1': Value(STUB, ['hello']),
+                    'int1': Value(STUB, 100),
+                    'string2': Value(STUB, 'new_string')
+                }
+            },
+            'mock_model_2': {
+                model2_instance.id: {
+                    'dict2': Value({'initial': 'value'}, {'hello': 'world', 'initial': 'value'}),
+                    'list2': Value(['initial'], ['initial', 'hello']),
+                    'int2': Value(STUB, 20000),
+                    'name': Value(STUB, 'new_name'),
+                }
+            }
+        }
+
+    def test_attribute_initial_none_value(self, storage):
+        instance1 = MockModel1(name='name')
+        instance2 = MockModel1(name='name2')
+        instance1.dict1 = None
+        instance2.dict1 = None
+        storage.mock_model_1.put(instance1)
+        storage.mock_model_1.put(instance2)
+        instrument = instrumentation.track_changes(instrumented={MockModel1.dict1: dict})
+        instance1 = storage.mock_model_1.get(instance1.id)
+        instance2 = storage.mock_model_1.get(instance2.id)
+        instance1.dict1 = {'new': 'value'}
+        assert instrument.tracked_changes == {
+            'mock_model_1': {
+                instance1.id: {'dict1': Value(STUB, {'new': 'value'})},
+                instance2.id: {'dict1': Value(None, None)},
+            }
+        }
+
+    def test_attribute_set_none_value(self, storage):
+        instance = MockModel1(name='name')
+        storage.mock_model_1.put(instance)
+        instrument = instrumentation.track_changes(instrumented={
+            MockModel1.dict1: dict,
+            MockModel1.list1: list,
+            MockModel1.string2: str,
+            MockModel1.int1: int
+        })
+        instance = storage.mock_model_1.get(instance.id)
+        instance.dict1 = None
+        instance.list1 = None
+        instance.string2 = None
+        instance.int1 = None
+        assert instrument.tracked_changes == {
+            'mock_model_1': {
+                instance.id: {
+                    'dict1': Value(STUB, None),
+                    'list1': Value(STUB, None),
+                    'string2': Value(STUB, None),
+                    'int1': Value(STUB, None)
+                }
+            }
+        }
+
+    def test_remove_listeners(self):
+        instrument = instrumentation.track_changes(instrumented={MockModel1.dict1: dict})
+        # set instance attribute, load instance, refresh instance and flush_refresh listeners
+        assert len(instrument.listeners) == 4
+        for listener_args in instrument.listeners:
+            assert event.contains(*listener_args)
+        instrument.restore()
+        assert len(instrument.listeners) == 4
+        for listener_args in instrument.listeners:
+            assert not event.contains(*listener_args)
+
+    def test_instrumentation_context_manager(self, storage):
+        instance = MockModel1(name='name')
+        storage.mock_model_1.put(instance)
+        with instrumentation.track_changes(instrumented={MockModel1.dict1: dict}) as instrument:
+            instance = storage.mock_model_1.get(instance.id)
+            instance.dict1 = {'new': 'value'}
+            assert instrument.tracked_changes == {
+                'mock_model_1': {instance.id: {'dict1': Value(STUB, {'new': 'value'})}}
+            }
+            assert len(instrument.listeners) == 4
+            for listener_args in instrument.listeners:
+                assert event.contains(*listener_args)
+        for listener_args in instrument.listeners:
+            assert not event.contains(*listener_args)
+
+    def test_apply_tracked_changes(self, storage):
+        initial_values = {'dict1': {'initial': 'value'}, 'list1': ['initial']}
+        instance1_1 = MockModel1(name='instance1_1', **initial_values)
+        instance1_2 = MockModel1(name='instance1_2', **initial_values)
+        instance2_1 = MockModel2(name='instance2_1', **initial_values)
+        instance2_2 = MockModel2(name='instance2_2', **initial_values)
+        storage.mock_model_1.put(instance1_1)
+        storage.mock_model_1.put(instance1_2)
+        storage.mock_model_2.put(instance2_1)
+        storage.mock_model_2.put(instance2_2)
+
+        instrument = instrumentation.track_changes(instrumented={
+            MockModel1.dict1: dict,
+            MockModel1.list1: list,
+            MockModel2.dict1: dict,
+            MockModel2.list1: list
+        })
+
+        def get_instances():
+            return (storage.mock_model_1.get(instance1_1.id),
+                    storage.mock_model_1.get(instance1_2.id),
+                    storage.mock_model_2.get(instance2_1.id),
+                    storage.mock_model_2.get(instance2_2.id))
+
+        instance1_1, instance1_2, instance2_1, instance2_2 = get_instances()
+        instance1_1.dict1 = {'new': 'value'}
+        instance1_2.list1 = ['new_value']
+        instance2_1.dict1.update({'new': 'value'})
+        instance2_2.list1.append('new_value')
+
+        instrument.restore()
+        storage.mock_model_1._session.expire_all()
+
+        instance1_1, instance1_2, instance2_1, instance2_2 = get_instances()
+        instance1_1.dict1 = {'overriding': 'value'}
+        instance1_2.list1 = ['overriding_value']
+        instance2_1.dict1 = {'overriding': 'value'}
+        instance2_2.list1 = ['overriding_value']
+        storage.mock_model_1.put(instance1_1)
+        storage.mock_model_1.put(instance1_2)
+        storage.mock_model_2.put(instance2_1)
+        storage.mock_model_2.put(instance2_2)
+        instance1_1, instance1_2, instance2_1, instance2_2 = get_instances()
+        assert instance1_1.dict1 == {'overriding': 'value'}
+        assert instance1_2.list1 == ['overriding_value']
+        assert instance2_1.dict1 == {'overriding': 'value'}
+        assert instance2_2.list1 == ['overriding_value']
+
+        instrumentation.apply_tracked_changes(
+            tracked_changes=instrument.tracked_changes,
+            model=storage)
+
+        instance1_1, instance1_2, instance2_1, instance2_2 = get_instances()
+        assert instance1_1.dict1 == {'new': 'value'}
+        assert instance1_2.list1 == ['new_value']
+        assert instance2_1.dict1 == {'initial': 'value', 'new': 'value'}
+        assert instance2_2.list1 == ['initial', 'new_value']
+
+
+@pytest.fixture
+def storage():
+    aria_type.remove_mutable_association_listener()
+    result = ModelStorage(
+        api_cls=sql_mapi.SQLAlchemyModelAPI,
+        api_kwargs=get_sqlite_api_kwargs(),
+        items=(MockModel1, MockModel2))
+    yield result
+    release_sqlite_storage(result)
+
+
+class MockModel1(model.DeclarativeBase, structure.ModelMixin):
+    __tablename__ = 'mock_model1'
+    name = Column(Text)
+    dict1 = Column(aria_type.Dict)
+    dict2 = Column(aria_type.Dict)
+    list1 = Column(aria_type.List)
+    list2 = Column(aria_type.List)
+    int1 = Column(Integer)
+    int2 = Column(Integer)
+    string2 = Column(Text)
+
+
+class MockModel2(model.DeclarativeBase, structure.ModelMixin):
+    __tablename__ = 'mock_model2'
+    name = Column(Text)
+    dict1 = Column(aria_type.Dict)
+    dict2 = Column(aria_type.Dict)
+    list1 = Column(aria_type.List)
+    list2 = Column(aria_type.List)
+    int1 = Column(Integer)
+    int2 = Column(Integer)
+    string2 = Column(Text)


Mime
View raw message