ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [2/2] incubator-ariatosca git commit: ARIA-258 Convert runtime_properties to attributes
Date Thu, 25 May 2017 15:30:27 GMT
ARIA-258 Convert runtime_properties to attributes


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/50b997e3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/50b997e3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/50b997e3

Branch: refs/heads/ARIA-258-Convert-runtime-properties-to-attributes
Commit: 50b997e3bfbaf26df5e66327d30fe8a015b92dd7
Parents: 0c98684
Author: max-orlov <maxim@gigaspaces.com>
Authored: Sun May 14 22:38:39 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Thu May 25 18:30:21 2017 +0300

----------------------------------------------------------------------
 aria/cli/commands/nodes.py                      |   6 +-
 aria/modeling/service_common.py                 |   9 +-
 aria/modeling/service_instance.py               |   8 +-
 aria/modeling/service_template.py               |   1 -
 aria/modeling/types.py                          |  20 -
 .../context/collection_instrumentation.py       | 242 ++++++++++++
 aria/orchestrator/context/operation.py          |  13 +-
 aria/orchestrator/context/toolbelt.py           |   5 +-
 .../execution_plugin/ctx_proxy/server.py        |   1 -
 aria/orchestrator/workflows/core/engine.py      |   1 -
 aria/orchestrator/workflows/executor/process.py | 125 +-----
 aria/storage/instrumentation.py                 | 282 -------------
 tests/helpers.py                                |  10 +
 tests/mock/models.py                            |   7 +-
 tests/modeling/test_mixins.py                   |   1 -
 tests/modeling/test_models.py                   |  28 +-
 .../context/test_collection_instrumentation.py  | 253 ++++++++++++
 tests/orchestrator/context/test_operation.py    |  90 ++++-
 tests/orchestrator/context/test_toolbelt.py     |   5 +-
 .../orchestrator/execution_plugin/test_local.py |  66 ++--
 tests/orchestrator/execution_plugin/test_ssh.py |  36 +-
 tests/orchestrator/workflows/core/test_task.py  |   2 +-
 .../orchestrator/workflows/executor/__init__.py |   4 +
 ...process_executor_concurrent_modifications.py |  67 ++--
 .../executor/test_process_executor_extension.py |   6 +-
 .../test_process_executor_tracked_changes.py    |  56 ++-
 tests/resources/scripts/test_ssh.sh             |  30 +-
 tests/storage/test_instrumentation.py           | 396 -------------------
 28 files changed, 786 insertions(+), 984 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/cli/commands/nodes.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands/nodes.py b/aria/cli/commands/nodes.py
index e43493f..1bbefe6 100644
--- a/aria/cli/commands/nodes.py
+++ b/aria/cli/commands/nodes.py
@@ -47,9 +47,9 @@ def show(node_id, model_storage, logger):
 
     # print node attributes
     logger.info('Node attributes:')
-    if node.runtime_properties:
-        for prop_name, prop_value in node.runtime_properties.iteritems():
-            logger.info('\t{0}: {1}'.format(prop_name, prop_value))
+    if node.attributes:
+        for param_name, param in node.attributes.iteritems():
+            logger.info('\t{0}: {1}'.format(param_name, param.value))
     else:
         logger.info('\tNo attributes')
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/modeling/service_common.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_common.py b/aria/modeling/service_common.py
index e9c96a4..ef19c8e 100644
--- a/aria/modeling/service_common.py
+++ b/aria/modeling/service_common.py
@@ -218,14 +218,13 @@ class ParameterBase(TemplateModelMixin, caching.HasCachedMethods):
         :type description: basestring
         """
 
-        from . import models
         type_name = canonical_type_name(value)
         if type_name is None:
             type_name = full_type_name(value)
-        return models.Parameter(name=name, # pylint: disable=unexpected-keyword-arg
-                                type_name=type_name,
-                                value=value,
-                                description=description)
+        return cls(name=name, # pylint: disable=unexpected-keyword-arg
+                   type_name=type_name,
+                   value=value,
+                   description=description)
 
 
 class TypeBase(InstanceModelMixin):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/modeling/service_instance.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_instance.py b/aria/modeling/service_instance.py
index 41a388d..7058969 100644
--- a/aria/modeling/service_instance.py
+++ b/aria/modeling/service_instance.py
@@ -333,8 +333,6 @@ class NodeBase(InstanceModelMixin):
     :vartype inbound_relationships: [:class:`Relationship`]
     :ivar host: Host node (can be self)
     :vartype host: :class:`Node`
-    :ivar runtime_properties: TODO: should be replaced with attributes
-    :vartype runtime_properties: {}
     :ivar state: The state of the node, according to to the TOSCA-defined node states
     :vartype state: string
     :ivar version: Used by `aria.storage.instrumentation`
@@ -520,7 +518,6 @@ class NodeBase(InstanceModelMixin):
     # endregion
 
     description = Column(Text)
-    runtime_properties = Column(modeling_types.Dict)
     state = Column(Enum(*STATES, name='node_state'), nullable=False, default=INITIAL)
     version = Column(Integer, default=1)
 
@@ -528,8 +525,9 @@ class NodeBase(InstanceModelMixin):
 
     @property
     def host_address(self):
-        if self.host and self.host.runtime_properties:
-            return self.host.runtime_properties.get('ip')
+        if self.host and self.host.attributes:
+            attribute = self.host.attributes.get('ip')
+            return attribute.value if attribute else None
         return None
 
     def satisfy_requirements(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/modeling/service_template.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py
index 12195a1..3110248 100644
--- a/aria/modeling/service_template.py
+++ b/aria/modeling/service_template.py
@@ -562,7 +562,6 @@ class NodeTemplateBase(TemplateModelMixin):
                            type=self.type,
                            description=deepcopy_with_locators(self.description),
                            state=models.Node.INITIAL,
-                           runtime_properties={},
                            node_template=self)
         utils.instantiate_dict(node, node.properties, self.properties)
         utils.instantiate_dict(node, node.attributes, self.attributes)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/modeling/types.py
----------------------------------------------------------------------
diff --git a/aria/modeling/types.py b/aria/modeling/types.py
index 7460f47..920a0c2 100644
--- a/aria/modeling/types.py
+++ b/aria/modeling/types.py
@@ -286,24 +286,4 @@ _LISTENER_ARGS = (mutable.mapper, 'mapper_configured', _mutable_association_list
 def _register_mutable_association_listener():
     event.listen(*_LISTENER_ARGS)
 
-
-def remove_mutable_association_listener():
-    """
-    Remove the event listener that associates ``Dict`` and ``List`` column types with
-    ``MutableDict`` and ``MutableList``, respectively.
-
-    This call must happen before any model instance is instantiated.
-    This is because once it does, that would trigger the listener we are trying to remove.
-    Once it is triggered, many other listeners will then be registered.
-    At that point, it is too late.
-
-    The reason this function exists is that the association listener, interferes with ARIA change
-    tracking instrumentation, so a way to disable it is required.
-
-    Note that the event listener this call removes is registered by default.
-    """
-    if event.contains(*_LISTENER_ARGS):
-        event.remove(*_LISTENER_ARGS)
-
-
 _register_mutable_association_listener()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/context/collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/collection_instrumentation.py b/aria/orchestrator/context/collection_instrumentation.py
new file mode 100644
index 0000000..91cfd35
--- /dev/null
+++ b/aria/orchestrator/context/collection_instrumentation.py
@@ -0,0 +1,242 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from functools import partial
+
+from aria.modeling import models
+
+
+class _InstrumentedCollection(object):
+
+    def __init__(self,
+                 model,
+                 parent,
+                 field_name,
+                 seq=None,
+                 is_top_level=True,
+                 **kwargs):
+        self._model = model
+        self._parent = parent
+        self._field_name = field_name
+        self._is_top_level = is_top_level
+        self._load(seq, **kwargs)
+
+    @property
+    def _raw(self):
+        raise NotImplementedError
+
+    def _load(self, seq, **kwargs):
+        """
+        Instantiates the object from existing seq.
+
+        :param seq: the original sequence to load from
+        :return:
+        """
+        raise NotImplementedError
+
+    def _set(self, key, value):
+        """
+        set the changes for the current object (not in the db)
+
+        :param key:
+        :param value:
+        :return:
+        """
+        raise NotImplementedError
+
+    def _del(self, collection, key):
+        raise NotImplementedError
+
+    def _instrument(self, key, value):
+        """
+        Instruments any collection to track changes (and ease of access)
+        :param key:
+        :param value:
+        :return:
+        """
+        if isinstance(value, _InstrumentedCollection):
+            return value
+        elif isinstance(value, dict):
+            instrumentation_cls = _InstrumentedDict
+        elif isinstance(value, list):
+            instrumentation_cls = _InstrumentedList
+        else:
+            return value
+
+        return instrumentation_cls(self._model, self, key, value, False)
+
+    @staticmethod
+    def _raw_value(value):
+        """
+        Get the raw value.
+        :param value:
+        :return:
+        """
+        if isinstance(value, models.Parameter):
+            return value.value
+        return value
+
+    @staticmethod
+    def _encapsulate_value(key, value):
+        """
+        Create a new item cls if needed.
+        :param key:
+        :param value:
+        :return:
+        """
+        if isinstance(value, models.Parameter):
+            return value
+        # If it is not wrapped
+        return models.Parameter.wrap(key, value)
+
+    def __setitem__(self, key, value):
+        """
+        Update the values in both the local and the db locations.
+        :param key:
+        :param value:
+        :return:
+        """
+        self._set(key, value)
+        if self._is_top_level:
+            # We are at the top level
+            field = getattr(self._parent, self._field_name)
+            mapi = getattr(self._model, models.Parameter.__modelname__)
+            value = self._set_field(field,
+                                    key,
+                                    value if key in field else self._encapsulate_value(key, value))
+            mapi.update(value)
+        else:
+            # We are not at the top level
+            self._set_field(self._parent, self._field_name, self)
+
+    def _set_field(self, collection, key, value):
+        """
+        enables updating the current change in the ancestors
+        :param collection: the collection to change
+        :param key: the key for the specific field
+        :param value: the new value
+        :return:
+        """
+        if isinstance(value, _InstrumentedCollection):
+            value = value._raw
+        if key in collection and isinstance(collection[key], models.Parameter):
+            if isinstance(collection[key], _InstrumentedCollection):
+                self._del(collection, key)
+            collection[key].value = value
+        else:
+            collection[key] = value
+        return collection[key]
+
+    def __deepcopy__(self, *args, **kwargs):
+        return self._raw
+
+
+class _InstrumentedDict(_InstrumentedCollection, dict):
+
+    def _load(self, dict_=None, **kwargs):
+        dict.__init__(
+            self,
+            tuple((key, self._raw_value(value)) for key, value in (dict_ or {}).items()),
+            **kwargs)
+
+    def update(self, dict_=None, **kwargs):
+        dict_ = dict_ or {}
+        for key, value in dict_.items():
+            self[key] = value
+        for key, value in kwargs.items():
+            self[key] = value
+
+    def __getitem__(self, key):
+        return self._instrument(key, dict.__getitem__(self, key))
+
+    def _set(self, key, value):
+        dict.__setitem__(self, key, self._raw_value(value))
+
+    @property
+    def _raw(self):
+        return dict(self)
+
+    def _del(self, collection, key):
+        del collection[key]
+
+
+class _InstrumentedList(_InstrumentedCollection, list):
+
+    def _load(self, list_=None, **kwargs):
+        list.__init__(self, list(item for item in list_ or []))
+
+    def append(self, value):
+        self.insert(len(self), value)
+
+    def insert(self, index, value):
+        list.insert(self, index, self._raw_value(value))
+        if self._is_top_level:
+            field = getattr(self._parent, self._field_name)
+            field.insert(index, self._encapsulate_value(index, value))
+        else:
+            self._parent[self._field_name] = self
+
+    def __getitem__(self, key):
+        return self._instrument(key, list.__getitem__(self, key))
+
+    def _set(self, key, value):
+        list.__setitem__(self, key, value)
+
+    def _del(self, collection, key):
+        del collection[key]
+
+    @property
+    def _raw(self):
+        return list(self)
+
+
+class _InstrumentedModel(object):
+
+    def __init__(self, field_name, original_model, model_storage):
+        super(_InstrumentedModel, self).__init__()
+        self._field_name = field_name
+        self._model_storage = model_storage
+        self._original_model = original_model
+        self._apply_instrumentation()
+
+    def __getattr__(self, item):
+        return getattr(self._original_model, item)
+
+    def _apply_instrumentation(self):
+
+        field = getattr(self._original_model, self._field_name)
+
+        # Preserve the original value. e.g. original attributes would be located under
+        # _attributes
+        setattr(self, '_{0}'.format(self._field_name), field)
+
+        # set instrumented value
+        setattr(self, self._field_name, _InstrumentedDict(self._model_storage,
+                                                          self._original_model,
+                                                          self._field_name,
+                                                          field))
+
+
+def instrument_collection(field_name, func=None):
+    if func is None:
+        return partial(instrument_collection, field_name)
+
+    def _wrapper(*args, **kwargs):
+        original_model = func(*args, **kwargs)
+        return type('Instrumented{0}'.format(original_model.__class__.__name__),
+                    (_InstrumentedModel, ),
+                    {})(field_name, original_model, args[0].model)
+
+    return _wrapper

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index 0ce790f..7c21351 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -21,10 +21,13 @@ import threading
 
 import aria
 from aria.utils import file
-from .common import BaseContext
+from . import (
+    common,
+    collection_instrumentation
+)
 
 
-class BaseOperationContext(BaseContext):
+class BaseOperationContext(common.BaseContext):
     """
     Context object used during operation creation and execution
     """
@@ -114,6 +117,7 @@ class NodeOperationContext(BaseOperationContext):
     """
 
     @property
+    @collection_instrumentation.instrument_collection('attributes')
     def node_template(self):
         """
         the node of the current operation
@@ -122,6 +126,7 @@ class NodeOperationContext(BaseOperationContext):
         return self.node.node_template
 
     @property
+    @collection_instrumentation.instrument_collection('attributes')
     def node(self):
         """
         The node instance of the current operation
@@ -136,6 +141,7 @@ class RelationshipOperationContext(BaseOperationContext):
     """
 
     @property
+    @collection_instrumentation.instrument_collection('attributes')
     def source_node_template(self):
         """
         The source node
@@ -144,6 +150,7 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.source_node.node_template
 
     @property
+    @collection_instrumentation.instrument_collection('attributes')
     def source_node(self):
         """
         The source node instance
@@ -152,6 +159,7 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.relationship.source_node
 
     @property
+    @collection_instrumentation.instrument_collection('attributes')
     def target_node_template(self):
         """
         The target node
@@ -160,6 +168,7 @@ class RelationshipOperationContext(BaseOperationContext):
         return self.target_node.node_template
 
     @property
+    @collection_instrumentation.instrument_collection('attributes')
     def target_node(self):
         """
         The target node instance

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/context/toolbelt.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py
index def7d42..5788ee7 100644
--- a/aria/orchestrator/context/toolbelt.py
+++ b/aria/orchestrator/context/toolbelt.py
@@ -34,7 +34,10 @@ class NodeToolBelt(object):
         """
         assert isinstance(self._op_context, operation.NodeOperationContext)
         host = self._op_context.node.host
-        return host.runtime_properties.get('ip')
+        ip = host.attributes.get('ip')
+        if ip:
+            return ip.value
+
 
 
 class RelationshipToolBelt(object):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 1ce0e08..102ff9a 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -98,7 +98,6 @@ class CtxProxy(object):
                 quiet=True,
                 server=BottleServerAdapter)
         thread = threading.Thread(target=serve)
-        thread.daemon = True
         thread.start()
         return thread
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index 561265c..3a96804 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -69,7 +69,6 @@ class Engine(logger.LoggerMixin):
             else:
                 events.on_success_workflow_signal.send(self._workflow_context)
         except BaseException as e:
-
             events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
             raise
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index da6bbb2..f02e0a6 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -43,14 +43,12 @@ import jsonpickle
 
 import aria
 from aria.orchestrator.workflows.executor import base
-from aria.storage import instrumentation
 from aria.extension import process_executor
 from aria.utils import (
     imports,
     exceptions,
     process as process_utils
 )
-from aria.modeling import types as modeling_types
 
 
 _INT_FMT = 'I'
@@ -82,7 +80,6 @@ class ProcessExecutor(base.BaseExecutor):
             'started': self._handle_task_started_request,
             'succeeded': self._handle_task_succeeded_request,
             'failed': self._handle_task_failed_request,
-            'apply_tracked_changes': self._handle_apply_tracked_changes_request
         }
 
         # Server socket used to accept task status messages from subprocesses
@@ -196,41 +193,13 @@ class ProcessExecutor(base.BaseExecutor):
     def _handle_task_started_request(self, task_id, **kwargs):
         self._task_started(self._tasks[task_id])
 
-    def _handle_task_succeeded_request(self, task_id, request, **kwargs):
+    def _handle_task_succeeded_request(self, task_id, **kwargs):
         task = self._remove_task(task_id)
-        try:
-            self._apply_tracked_changes(task, request)
-        except BaseException as e:
-            e.message += UPDATE_TRACKED_CHANGES_FAILED_STR
-            self._task_failed(task, exception=e)
-        else:
-            self._task_succeeded(task)
+        self._task_succeeded(task)
 
     def _handle_task_failed_request(self, task_id, request, **kwargs):
         task = self._remove_task(task_id)
-        try:
-            self._apply_tracked_changes(task, request)
-        except BaseException as e:
-            e.message += 'Task failed due to {0}.'.format(request['exception']) + \
-                         UPDATE_TRACKED_CHANGES_FAILED_STR
-            self._task_failed(
-                task, exception=e, traceback=exceptions.get_exception_as_string(*sys.exc_info()))
-        else:
-            self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
-
-    def _handle_apply_tracked_changes_request(self, task_id, request, response):
-        task = self._tasks[task_id]
-        try:
-            self._apply_tracked_changes(task, request)
-        except BaseException as e:
-            response['exception'] = exceptions.wrap_if_needed(e)
-
-    @staticmethod
-    def _apply_tracked_changes(task, request):
-        instrumentation.apply_tracked_changes(
-            tracked_changes=request['tracked_changes'],
-            new_instances=request['new_instances'],
-            model=task.context.model)
+        self._task_failed(task, exception=request['exception'], traceback=request['traceback'])
 
 
 def _send_message(connection, message):
@@ -278,28 +247,19 @@ class _Messenger(object):
         """Task started message"""
         self._send_message(type='started')
 
-    def succeeded(self, tracked_changes, new_instances):
+    def succeeded(self):
         """Task succeeded message"""
-        self._send_message(
-            type='succeeded', tracked_changes=tracked_changes, new_instances=new_instances)
+        self._send_message(type='succeeded')
 
-    def failed(self, tracked_changes, new_instances, exception):
+    def failed(self, exception):
         """Task failed message"""
-        self._send_message(type='failed',
-                           tracked_changes=tracked_changes,
-                           new_instances=new_instances,
-                           exception=exception)
-
-    def apply_tracked_changes(self, tracked_changes, new_instances):
-        self._send_message(type='apply_tracked_changes',
-                           tracked_changes=tracked_changes,
-                           new_instances=new_instances)
+        self._send_message(type='failed', exception=exception)
 
     def closed(self):
         """Executor closed message"""
         self._send_message(type='closed')
 
-    def _send_message(self, type, tracked_changes=None, new_instances=None, exception=None):
+    def _send_message(self, type, exception=None):
         sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
         sock.connect(('localhost', self.port))
         try:
@@ -308,8 +268,6 @@ class _Messenger(object):
                 'task_id': self.task_id,
                 'exception': exceptions.wrap_if_needed(exception),
                 'traceback': exceptions.get_exception_as_string(*sys.exc_info()),
-                'tracked_changes': tracked_changes or {},
-                'new_instances': new_instances or {}
             })
             response = _recv_message(sock)
             response_exception = response.get('exception')
@@ -319,39 +277,6 @@ class _Messenger(object):
             sock.close()
 
 
-def _patch_ctx(ctx, messenger, instrument):
-    # model will be None only in tests that test the executor component directly
-    if not ctx.model:
-        return
-
-    # We arbitrarily select the ``node`` mapi to extract the session from it.
-    # could have been any other mapi just as well
-    session = ctx.model.node._session
-    original_refresh = session.refresh
-
-    def patched_refresh(target):
-        instrument.clear(target)
-        original_refresh(target)
-
-    def patched_commit():
-        messenger.apply_tracked_changes(instrument.tracked_changes, instrument.new_instances)
-        instrument.expunge_session()
-        instrument.clear()
-
-    def patched_rollback():
-        # Rollback is performed on parent process when commit fails
-        instrument.expunge_session()
-
-    # when autoflush is set to true (the default), refreshing an object will trigger
-    # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
-    # far on the session. this is not the desired behavior in the subprocess
-    session.autoflush = False
-
-    session.commit = patched_commit
-    session.rollback = patched_rollback
-    session.refresh = patched_refresh
-
-
 def _main():
     arguments_json_path = sys.argv[1]
     with open(arguments_json_path) as f:
@@ -369,32 +294,24 @@ def _main():
     operation_inputs = arguments['operation_inputs']
     context_dict = arguments['context']
 
-    # This is required for the instrumentation work properly.
-    # See docstring of `remove_mutable_association_listener` for further details
-    modeling_types.remove_mutable_association_listener()
     try:
         ctx = context_dict['context_cls'].instantiate_from_dict(**context_dict['context'])
     except BaseException as e:
-        messenger.failed(exception=e, tracked_changes=None, new_instances=None)
+        messenger.failed(e)
         return
 
-    with instrumentation.track_changes(ctx.model) as instrument:
-        try:
-            messenger.started()
-            _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument)
-            task_func = imports.load_attribute(implementation)
-            aria.install_aria_extensions()
-            for decorate in process_executor.decorate():
-                task_func = decorate(task_func)
-            task_func(ctx=ctx, **operation_inputs)
-            messenger.succeeded(tracked_changes=instrument.tracked_changes,
-                                new_instances=instrument.new_instances)
-        except BaseException as e:
-            messenger.failed(exception=e,
-                             tracked_changes=instrument.tracked_changes,
-                             new_instances=instrument.new_instances)
-        finally:
-            instrument.expunge_session()
+    try:
+        messenger.started()
+        task_func = imports.load_attribute(implementation)
+        aria.install_aria_extensions()
+        for decorate in process_executor.decorate():
+            task_func = decorate(task_func)
+        task_func(ctx=ctx, **operation_inputs)
+        ctx.close()
+        messenger.succeeded()
+    except BaseException as e:
+        ctx.close()
+        messenger.failed(e)
 
 if __name__ == '__main__':
     _main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
deleted file mode 100644
index 390f933..0000000
--- a/aria/storage/instrumentation.py
+++ /dev/null
@@ -1,282 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import copy
-import json
-import os
-
-import sqlalchemy.event
-
-from ..modeling import models as _models
-from ..storage.exceptions import StorageError
-
-
-_VERSION_ID_COL = 'version'
-_STUB = object()
-_INSTRUMENTED = {
-    'modified': {
-        _models.Node.runtime_properties: dict,
-        _models.Node.state: str,
-        _models.Task.status: str,
-    },
-    'new': (_models.Log, )
-
-}
-
-_NEW_INSTANCE = 'NEW_INSTANCE'
-
-
-def track_changes(model=None, instrumented=None):
-    """Track changes in the specified model columns
-
-    This call will register event listeners using sqlalchemy's event mechanism. The listeners
-    instrument all returned objects such that the attributes specified in ``instrumented``, will
-    be replaced with a value that is stored in the returned instrumentation context
-    ``tracked_changes`` property.
-
-    Why should this be implemented when sqlalchemy already does a fantastic job at tracking changes
-    you ask? Well, when sqlalchemy is used with sqlite, due to how sqlite works, only one process
-    can hold a write lock to the database. This does not work well when ARIA runs tasks in
-    subprocesses (by the process executor) and these tasks wish to change some state as well. These
-    tasks certainly deserve a chance to do so!
-
-    To enable this, the subprocess calls ``track_changes()`` before any state changes are made.
-    At the end of the subprocess execution, it should return the ``tracked_changes`` attribute of
-    the instrumentation context returned from this call, to the parent process. The parent process
-    will then call ``apply_tracked_changes()`` that resides in this module as well.
-    At that point, the changes will actually be written back to the database.
-
-    :param model: the model storage. it should hold a mapi for each model. the session of each mapi
-    is needed to setup events
-    :param instrumented: A dict from model columns to their python native type
-    :return: The instrumentation context
-    """
-    return _Instrumentation(model, instrumented or _INSTRUMENTED)
-
-
-class _Instrumentation(object):
-
-    def __init__(self, model, instrumented):
-        self.tracked_changes = {}
-        self.new_instances = {}
-        self.listeners = []
-        self._instances_to_expunge = []
-        self._model = model
-        self._track_changes(instrumented)
-
-    @property
-    def _new_instance_id(self):
-        return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE,
-                                         index=len(self._instances_to_expunge))
-
-    def expunge_session(self):
-        for new_instance in self._instances_to_expunge:
-            self._get_session_from_model(new_instance.__tablename__).expunge(new_instance)
-
-    def _get_session_from_model(self, tablename):
-        mapi = getattr(self._model, tablename, None)
-        if mapi:
-            return mapi._session
-        raise StorageError("Could not retrieve session for {0}".format(tablename))
-
-    def _track_changes(self, instrumented):
-        instrumented_attribute_classes = {}
-        # Track any newly-set attributes.
-        for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
-            self._register_set_attribute_listener(
-                instrumented_attribute=instrumented_attribute,
-                attribute_type=attribute_type)
-            instrumented_class = instrumented_attribute.parent.entity
-            instrumented_class_attributes = instrumented_attribute_classes.setdefault(
-                instrumented_class, {})
-            instrumented_class_attributes[instrumented_attribute.key] = attribute_type
-
-        # Track any global instance update such as 'refresh' or 'load'
-        for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items():
-            self._register_instance_listeners(instrumented_class=instrumented_class,
-                                              instrumented_attributes=instrumented_attributes)
-
-        # Track any newly created instances.
-        for instrumented_class in instrumented.get('new', {}):
-            self._register_new_instance_listener(instrumented_class)
-
-    def _register_new_instance_listener(self, instrumented_class):
-        if self._model is None:
-            raise StorageError("In order to keep track of new instances, a ctx is needed")
-
-        def listener(_, instance):
-            if not isinstance(instance, instrumented_class):
-                return
-            self._instances_to_expunge.append(instance)
-            tracked_instances = self.new_instances.setdefault(instance.__modelname__, {})
-            tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {})
-            instance_as_dict = instance.to_dict()
-            instance_as_dict.update((k, getattr(instance, k))
-                                    for k in getattr(instance, '__private_fields__', []))
-            tracked_attributes.update(instance_as_dict)
-        session = self._get_session_from_model(instrumented_class.__tablename__)
-        listener_args = (session, 'after_attach', listener)
-        sqlalchemy.event.listen(*listener_args)
-        self.listeners.append(listener_args)
-
-    def _register_set_attribute_listener(self, instrumented_attribute, attribute_type):
-        def listener(target, value, *_):
-            mapi_name = target.__modelname__
-            tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
-            tracked_attributes = tracked_instances.setdefault(target.id, {})
-            if value is None:
-                current = None
-            else:
-                current = copy.deepcopy(attribute_type(value))
-            tracked_attributes[instrumented_attribute.key] = _Value(_STUB, current)
-            return current
-        listener_args = (instrumented_attribute, 'set', listener)
-        sqlalchemy.event.listen(*listener_args, retval=True)
-        self.listeners.append(listener_args)
-
-    def _register_instance_listeners(self, instrumented_class, instrumented_attributes):
-        def listener(target, *_):
-            mapi_name = instrumented_class.__modelname__
-            tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
-            tracked_attributes = tracked_instances.setdefault(target.id, {})
-            if hasattr(target, _VERSION_ID_COL):
-                # We want to keep track of the initial version id so it can be compared
-                # with the committed version id when the tracked changes are applied
-                tracked_attributes.setdefault(_VERSION_ID_COL,
-                                              _Value(_STUB, getattr(target, _VERSION_ID_COL)))
-            for attribute_name, attribute_type in instrumented_attributes.items():
-                if attribute_name not in tracked_attributes:
-                    initial = getattr(target, attribute_name)
-                    if initial is None:
-                        current = None
-                    else:
-                        current = copy.deepcopy(attribute_type(initial))
-                    tracked_attributes[attribute_name] = _Value(initial, current)
-                target.__dict__[attribute_name] = tracked_attributes[attribute_name].current
-        for listener_args in ((instrumented_class, 'load', listener),
-                              (instrumented_class, 'refresh', listener),
-                              (instrumented_class, 'refresh_flush', listener)):
-            sqlalchemy.event.listen(*listener_args)
-            self.listeners.append(listener_args)
-
-    def clear(self, target=None):
-        if target:
-            mapi_name = target.__modelname__
-            tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
-            tracked_instances.pop(target.id, None)
-        else:
-            self.tracked_changes.clear()
-
-        self.new_instances.clear()
-        self._instances_to_expunge = []
-
-    def restore(self):
-        """Remove all listeners registered by this instrumentation"""
-        for listener_args in self.listeners:
-            if sqlalchemy.event.contains(*listener_args):
-                sqlalchemy.event.remove(*listener_args)
-
-    def __enter__(self):
-        return self
-
-    def __exit__(self, exc_type, exc_val, exc_tb):
-        self.restore()
-
-
-class _Value(object):
-    # You may wonder why is this a full blown class and not a named tuple. The reason is that
-    # jsonpickle that is used to serialize the tracked_changes, does not handle named tuples very
-    # well. At the very least, I could not get it to behave.
-
-    def __init__(self, initial, current):
-        self.initial = initial
-        self.current = current
-
-    def __eq__(self, other):
-        if not isinstance(other, _Value):
-            return False
-        return self.initial == other.initial and self.current == other.current
-
-    def __hash__(self):
-        return hash((self.initial, self.current))
-
-    @property
-    def dict(self):
-        return {'initial': self.initial, 'current': self.current}.copy()
-
-
-def apply_tracked_changes(tracked_changes, new_instances, model):
-    """Write tracked changes back to the database using provided model storage
-
-    :param tracked_changes: The ``tracked_changes`` attribute of the instrumentation context
-                            returned by calling ``track_changes()``
-    :param model: The model storage used to actually apply the changes
-    """
-    successfully_updated_changes = dict()
-    try:
-        # handle instance updates
-        for mapi_name, tracked_instances in tracked_changes.items():
-            successfully_updated_changes[mapi_name] = dict()
-            mapi = getattr(model, mapi_name)
-            for instance_id, tracked_attributes in tracked_instances.items():
-                successfully_updated_changes[mapi_name][instance_id] = dict()
-                instance = None
-                for attribute_name, value in tracked_attributes.items():
-                    if value.initial != value.current:
-                        instance = instance or mapi.get(instance_id)
-                        setattr(instance, attribute_name, value.current)
-                if instance:
-                    _validate_version_id(instance, mapi)
-                    mapi.update(instance)
-                    successfully_updated_changes[mapi_name][instance_id] = [
-                        v.dict for v in tracked_attributes.values()]
-
-        # Handle new instances
-        for mapi_name, new_instance in new_instances.items():
-            successfully_updated_changes[mapi_name] = dict()
-            mapi = getattr(model, mapi_name)
-            for new_instance_kwargs in new_instance.values():
-                instance = mapi.model_cls(**new_instance_kwargs)
-                mapi.put(instance)
-                successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs
-    except BaseException:
-        for key, value in successfully_updated_changes.items():
-            if not value:
-                del successfully_updated_changes[key]
-        # TODO: if the successful has _STUB, the logging fails because it can't serialize the object
-        model.logger.error(
-            'Registering all the changes to the storage has failed. {0}'
-            'The successful updates were: {0} '
-            '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4)))
-
-        raise
-
-
-def _validate_version_id(instance, mapi):
-    version_id = sqlalchemy.inspect(instance).committed_state.get(_VERSION_ID_COL)
-    # There are two version conflict code paths:
-    # 1. The instance committed state loaded already holds a newer version,
-    #    in this case, we manually raise the error
-    # 2. The UPDATE statement is executed with version validation and sqlalchemy
-    #    will raise a StateDataError if there is a version mismatch.
-    if version_id and getattr(instance, _VERSION_ID_COL) != version_id:
-        object_version_id = getattr(instance, _VERSION_ID_COL)
-        mapi._session.rollback()
-        raise StorageError(
-            'Version conflict: committed and object {0} differ '
-            '[committed {0}={1}, object {0}={2}]'
-            .format(_VERSION_ID_COL,
-                    version_id,
-                    object_version_id))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/helpers.py
----------------------------------------------------------------------
diff --git a/tests/helpers.py b/tests/helpers.py
index 3c3efc9..4c3194b 100644
--- a/tests/helpers.py
+++ b/tests/helpers.py
@@ -47,6 +47,9 @@ class FilesystemDataHolder(object):
         with open(self._path, 'w') as f:
             return json.dump(value, f)
 
+    def __contains__(self, item):
+        return item in self._load()
+
     def __setitem__(self, key, value):
         dict_ = self._load()
         dict_[key] = value
@@ -67,6 +70,13 @@ class FilesystemDataHolder(object):
         self._dump(dict_)
         return return_value
 
+    def update(self, dict_=None, **kwargs):
+        current_dict = self._load()
+        if dict_:
+            current_dict.update(dict_)
+        current_dict.update(**kwargs)
+        self._dump(current_dict)
+
     @property
     def path(self):
         return self._path

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/mock/models.py
----------------------------------------------------------------------
diff --git a/tests/mock/models.py b/tests/mock/models.py
index f066551..98703d5 100644
--- a/tests/mock/models.py
+++ b/tests/mock/models.py
@@ -120,7 +120,7 @@ def create_node_with_dependencies(include_attribute=False):
     node_template.service_template.services[0] = create_service(node_template.service_template)
     node = create_node(node_template, node_template.service_template.services[0])
     if include_attribute:
-        node.runtime_properties = {'attribute1': 'value1'}
+        node.attributes['attribute1'] = models.Parameter.wrap('attribute1', 'value1')               # pylint: disable=unsubscriptable-object
     return node
 
 
@@ -184,13 +184,10 @@ def create_dependent_node_template(
     )
 
 
-def create_node(dependency_node_template, service, name=NODE_NAME, state=models.Node.INITIAL,
-                runtime_properties=None):
-    runtime_properties = runtime_properties or {}
+def create_node(dependency_node_template, service, name=NODE_NAME, state=models.Node.INITIAL):
     node = models.Node(
         name=name,
         type=dependency_node_template.type,
-        runtime_properties=runtime_properties,
         version=None,
         node_template=dependency_node_template,
         state=state,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/modeling/test_mixins.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_mixins.py b/tests/modeling/test_mixins.py
index a18a04e..2c91a4b 100644
--- a/tests/modeling/test_mixins.py
+++ b/tests/modeling/test_mixins.py
@@ -121,7 +121,6 @@ def test_relationship_model_ordering(context):
     new_node = modeling.models.Node(
         name='new_node',
         type=source_node.type,
-        runtime_properties={},
         service=service,
         version=None,
         node_template=new_node_template,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/modeling/test_models.py
----------------------------------------------------------------------
diff --git a/tests/modeling/test_models.py b/tests/modeling/test_models.py
index 61034bd..57511dd 100644
--- a/tests/modeling/test_models.py
+++ b/tests/modeling/test_models.py
@@ -538,22 +538,20 @@ class TestNodeTemplate(object):
 
 class TestNode(object):
     @pytest.mark.parametrize(
-        'is_valid, name, runtime_properties, state, version',
+        'is_valid, name, state, version',
         [
-            (False, m_cls, {}, 'state', 1),
-            (False, 'name', m_cls, 'state', 1),
-            (False, 'name', {}, 'state', 1),
-            (False, 'name', {}, m_cls, 1),
-            (False, m_cls, {}, 'state', m_cls),
-
-            (True, 'name', {}, 'initial', 1),
-            (True, None, {}, 'initial', 1),
-            (True, 'name', None, 'initial', 1),
-            (True, 'name', {}, 'initial', None),
+            (False, m_cls, 'state', 1),
+            (False, 'name', 'state', 1),
+            (False, 'name', m_cls, 1),
+            (False, m_cls, 'state', m_cls),
+
+            (True, 'name', 'initial', 1),
+            (True, None, 'initial', 1),
+            (True, 'name', 'initial', 1),
+            (True, 'name', 'initial', None),
         ]
     )
-    def test_node_model_creation(self, node_template_storage, is_valid, name, runtime_properties,
-                                 state, version):
+    def test_node_model_creation(self, node_template_storage, is_valid, name, state, version):
         node = _test_model(
             is_valid=is_valid,
             storage=node_template_storage,
@@ -562,7 +560,6 @@ class TestNode(object):
                 node_template=node_template_storage.node_template.list()[0],
                 type=node_template_storage.type.list()[0],
                 name=name,
-                runtime_properties=runtime_properties,
                 state=state,
                 version=version,
                 service=node_template_storage.service.list()[0]
@@ -635,7 +632,6 @@ class TestNodeHostAddress(object):
             name='node',
             node_template=node_template,
             type=storage.type.list()[0],
-            runtime_properties={},
             state='initial',
             service=storage.service.list()[0]
         )
@@ -644,7 +640,7 @@ class TestNodeHostAddress(object):
             if host_address is not None:
                 host_address = host_address.value
         if host_address:
-            kwargs['runtime_properties']['ip'] = host_address
+            kwargs.setdefault('attributes', {})['ip'] = Parameter.wrap('ip', host_address)
         if is_host:
             kwargs['host_fk'] = 1
         elif host_fk:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/context/test_collection_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_collection_instrumentation.py b/tests/orchestrator/context/test_collection_instrumentation.py
new file mode 100644
index 0000000..3ee5a44
--- /dev/null
+++ b/tests/orchestrator/context/test_collection_instrumentation.py
@@ -0,0 +1,253 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import pytest
+
+from aria.modeling.models import Parameter
+from aria.orchestrator.context import collection_instrumentation
+
+
+class MockActor(object):
+    def __init__(self):
+        self.dict_ = {}
+        self.list_ = []
+
+
+class MockModel(object):
+
+    def __init__(self):
+        self.parameter = type('MockModel', (object, ), {'model_cls': Parameter,
+                                                        'put': lambda *args, **kwargs: None,
+                                                        'update': lambda *args, **kwargs: None})()
+
+
+class CollectionInstrumentation(object):
+
+    @pytest.fixture
+    def actor(self):
+        return MockActor()
+
+    @pytest.fixture
+    def model(self):
+        return MockModel()
+
+    @pytest.fixture
+    def dict_(self, actor, model):
+        return collection_instrumentation._InstrumentedDict(model, actor, 'dict_')
+
+    @pytest.fixture
+    def list_(self, actor, model):
+        return collection_instrumentation._InstrumentedList(model, actor, 'list_')
+
+
+class TestDict(CollectionInstrumentation):
+
+    def test_keys(self, actor, dict_):
+        dict_.update(
+            {
+                'key1': Parameter.wrap('key1', 'value1'),
+                'key2': Parameter.wrap('key2', 'value2')
+            }
+        )
+        assert sorted(dict_.keys()) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
+
+    def test_values(self, actor, dict_):
+        dict_.update({
+            'key1': Parameter.wrap('key1', 'value1'),
+            'key2': Parameter.wrap('key1', 'value2')
+        })
+        assert (sorted(dict_.values()) ==
+                sorted(['value1', 'value2']) ==
+                sorted(v.value for v in actor.dict_.values()))
+
+    def test_items(self, dict_):
+        dict_.update({
+            'key1': Parameter.wrap('key1', 'value1'),
+            'key2': Parameter.wrap('key1', 'value2')
+        })
+        assert sorted(dict_.items()) == sorted([('key1', 'value1'), ('key2', 'value2')])
+
+    def test_iter(self, actor, dict_):
+        dict_.update({
+            'key1': Parameter.wrap('key1', 'value1'),
+            'key2': Parameter.wrap('key1', 'value2')
+        })
+        assert sorted(list(dict_)) == sorted(['key1', 'key2']) == sorted(actor.dict_.keys())
+
+    def test_bool(self, dict_):
+        assert not dict_
+        dict_.update({
+            'key1': Parameter.wrap('key1', 'value1'),
+            'key2': Parameter.wrap('key1', 'value2')
+        })
+        assert dict_
+
+    def test_set_item(self, actor, dict_):
+        dict_['key1'] = Parameter.wrap('key1', 'value1')
+        assert dict_['key1'] == 'value1' == actor.dict_['key1'].value
+        assert isinstance(actor.dict_['key1'], Parameter)
+
+    def test_nested(self, actor, dict_):
+        dict_['key'] = {}
+        assert isinstance(actor.dict_['key'], Parameter)
+        assert dict_['key'] == actor.dict_['key'].value == {}
+
+        dict_['key']['inner_key'] = 'value'
+
+        assert len(dict_) == 1
+        assert 'inner_key' in dict_['key']
+        assert dict_['key']['inner_key'] == 'value'
+        assert dict_['key'].keys() == ['inner_key']
+        assert dict_['key'].values() == ['value']
+        assert dict_['key'].items() == [('inner_key', 'value')]
+        assert isinstance(actor.dict_['key'], Parameter)
+        assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
+
+        dict_['key'].update({'updated_key': 'updated_value'})
+        assert len(dict_) == 1
+        assert 'updated_key' in dict_['key']
+        assert dict_['key']['updated_key'] == 'updated_value'
+        assert sorted(dict_['key'].keys()) == sorted(['inner_key', 'updated_key'])
+        assert sorted(dict_['key'].values()) == sorted(['value', 'updated_value'])
+        assert sorted(dict_['key'].items()) == sorted([('inner_key', 'value'),
+                                                       ('updated_key', 'updated_value')])
+        assert isinstance(actor.dict_['key'], Parameter)
+        assert isinstance(dict_['key'], collection_instrumentation._InstrumentedDict)
+
+        dict_.update({'key': 'override_value'})
+        assert len(dict_) == 1
+        assert 'key' in dict_
+        assert dict_['key'] == 'override_value'
+        assert len(actor.dict_) == 1
+        assert isinstance(actor.dict_['key'], Parameter)
+        assert actor.dict_['key'].value == 'override_value'
+
+    def test_get_item(self, actor, dict_):
+        dict_['key1'] = Parameter.wrap('key1', 'value1')
+        assert isinstance(actor.dict_['key1'], Parameter)
+
+    def test_update(self, actor, dict_):
+        dict_['key1'] = 'value1'
+
+        new_dict = {'key2': 'value2'}
+        dict_.update(new_dict)
+        assert len(dict_) == 2
+        assert dict_['key2'] == 'value2'
+        assert isinstance(actor.dict_['key2'], Parameter)
+
+        new_dict = {}
+        new_dict.update(dict_)
+        assert new_dict['key1'] == dict_['key1']
+
+    def test_copy(self, dict_):
+        dict_['key1'] = 'value1'
+
+        new_dict = dict_.copy()
+        assert new_dict is not dict_
+        assert new_dict == dict_
+
+        dict_['key1'] = 'value2'
+        assert new_dict['key1'] == 'value1'
+        assert dict_['key1'] == 'value2'
+
+    def test_clear(self, dict_):
+        dict_['key1'] = 'value1'
+        dict_.clear()
+
+        assert len(dict_) == 0
+
+
+class TestList(CollectionInstrumentation):
+
+    def test_append(self, actor, list_):
+        list_.append(Parameter.wrap('name', 'value1'))
+        list_.append('value2')
+        assert len(actor.list_) == 2
+        assert len(list_) == 2
+        assert isinstance(actor.list_[0], Parameter)
+        assert list_[0] == 'value1'
+
+        assert isinstance(actor.list_[1], Parameter)
+        assert list_[1] == 'value2'
+
+        list_[0] = 'new_value1'
+        list_[1] = 'new_value2'
+        assert isinstance(actor.list_[1], Parameter)
+        assert isinstance(actor.list_[1], Parameter)
+        assert list_[0] == 'new_value1'
+        assert list_[1] == 'new_value2'
+
+    def test_iter(self, list_):
+        list_.append('value1')
+        list_.append('value2')
+        assert sorted(list_) == sorted(['value1', 'value2'])
+
+    def test_insert(self, actor, list_):
+        list_.append('value1')
+        list_.insert(0, 'value2')
+        list_.insert(2, 'value3')
+        list_.insert(10, 'value4')
+        assert sorted(list_) == sorted(['value1', 'value2', 'value3', 'value4'])
+        assert len(actor.list_) == 4
+
+    def test_set(self, list_):
+        list_.append('value1')
+        list_.append('value2')
+
+        list_[1] = 'value3'
+        assert len(list_) == 2
+        assert sorted(list_) == sorted(['value1', 'value3'])
+
+    def test_insert_into_nested(self, actor, list_):
+        list_.append([])
+
+        list_[0].append('inner_item')
+        assert isinstance(actor.list_[0], Parameter)
+        assert len(list_) == 1
+        assert list_[0][0] == 'inner_item'
+
+        list_[0].append('new_item')
+        assert isinstance(actor.list_[0], Parameter)
+        assert len(list_) == 1
+        assert list_[0][1] == 'new_item'
+
+        assert list_[0] == ['inner_item', 'new_item']
+        assert ['inner_item', 'new_item'] == list_[0]
+
+
+class TestDictList(CollectionInstrumentation):
+    def test_dict_in_list(self, actor, list_):
+        list_.append({})
+        assert len(list_) == 1
+        assert isinstance(actor.list_[0], Parameter)
+        assert actor.list_[0].value == {}
+
+        list_[0]['key'] = 'value'
+        assert list_[0]['key'] == 'value'
+        assert len(actor.list_) == 1
+        assert isinstance(actor.list_[0], Parameter)
+        assert actor.list_[0].value['key'] == 'value'
+
+    def test_list_in_dict(self, actor, dict_):
+        dict_['key'] = []
+        assert len(dict_) == 1
+        assert isinstance(actor.dict_['key'], Parameter)
+        assert actor.dict_['key'].value == []
+
+        dict_['key'].append('value')
+        assert dict_['key'][0] == 'value'
+        assert len(actor.dict_) == 1
+        assert isinstance(actor.dict_['key'], Parameter)
+        assert actor.dict_['key'].value[0] == 'value'

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/context/test_operation.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_operation.py b/tests/orchestrator/context/test_operation.py
index cdeb5fa..5d193bc 100644
--- a/tests/orchestrator/context/test_operation.py
+++ b/tests/orchestrator/context/test_operation.py
@@ -343,6 +343,74 @@ def test_relationship_operation_logging(ctx, executor):
     _assert_loggins(ctx, inputs)
 
 
+def test_attribute_consumption(ctx, executor, dataholder):
+    # region Updating node operation
+    node_int_name, node_op_name = mock.operations.NODE_OPERATIONS_INSTALL[0]
+
+    source_node = ctx.model.node.get_by_name(mock.models.DEPENDENT_NODE_NAME)
+
+    inputs = {'dict_': {'key': 'value'},
+              'set_test_dict': {'key2': 'value2'}}
+    interface = mock.models.create_interface(
+        source_node.service,
+        node_int_name,
+        node_op_name,
+        operation_kwargs=dict(
+            implementation=op_path(attribute_altering_operation, module_path=__name__),
+            inputs=inputs)
+    )
+    source_node.interfaces[interface.name] = interface
+    ctx.model.node.update(source_node)
+    # endregion
+
+    # region updating relationship operation
+    rel_int_name, rel_op_name = mock.operations.RELATIONSHIP_OPERATIONS_INSTALL[2]
+
+    relationship = ctx.model.relationship.list()[0]
+    interface = mock.models.create_interface(
+        relationship.source_node.service,
+        rel_int_name,
+        rel_op_name,
+        operation_kwargs=dict(
+            implementation=op_path(attribute_consuming_operation, module_path=__name__),
+            inputs={'holder_path': dataholder.path}
+        )
+    )
+    relationship.interfaces[interface.name] = interface
+    ctx.model.relationship.update(relationship)
+    # endregion
+
+    @workflow
+    def basic_workflow(graph, **_):
+        graph.sequence(
+            api.task.OperationTask(
+                source_node,
+                interface_name=node_int_name,
+                operation_name=node_op_name,
+                inputs=inputs
+            ),
+            api.task.OperationTask(
+                relationship,
+                interface_name=rel_int_name,
+                operation_name=rel_op_name,
+            )
+        )
+
+    execute(workflow_func=basic_workflow, workflow_context=ctx, executor=executor)
+    target_node = ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
+
+    assert len(source_node.attributes) == len(target_node.attributes) == 2
+    assert source_node.attributes['key'] != target_node.attributes['key']
+    assert source_node.attributes['key'].value == \
+           target_node.attributes['key'].value == \
+           dataholder['key'] == 'value'
+
+    assert source_node.attributes['key2'] != target_node.attributes['key2']
+    assert source_node.attributes['key2'].value == \
+           target_node.attributes['key2'].value == \
+           dataholder['key2'] == 'value2'
+
+
 def _assert_loggins(ctx, inputs):
 
     # The logs should contain the following: Workflow Start, Operation Start, custom operation
@@ -377,10 +445,10 @@ def _assert_loggins(ctx, inputs):
 
 @operation
 def logged_operation(ctx, **_):
-    ctx.logger.info(ctx.task.inputs['op_start'])
+    ctx.logger.info(ctx.task.inputs['op_start'].value)
     # enables to check the relation between the created_at field properly
     time.sleep(1)
-    ctx.logger.debug(ctx.task.inputs['op_end'])
+    ctx.logger.debug(ctx.task.inputs['op_end'].value)
 
 
 @operation
@@ -422,3 +490,21 @@ def get_node_id(ctx, holder_path, **_):
 def _test_plugin_workdir(ctx, filename, content):
     with open(os.path.join(ctx.plugin_workdir, filename), 'w') as f:
         f.write(content)
+
+
+@operation
+def attribute_altering_operation(ctx, dict_, set_test_dict, **_):
+    ctx.node.attributes.update(dict_)
+
+    for key, value in set_test_dict.items():
+        ctx.node.attributes[key] = value
+
+
+@operation
+def attribute_consuming_operation(ctx, holder_path, **_):
+    holder = helpers.FilesystemDataHolder(holder_path)
+    ctx.target_node.attributes.update(ctx.source_node.attributes)
+    holder.update(**ctx.target_node.attributes)
+
+    ctx.target_node.attributes['key2'] = ctx.source_node.attributes['key2']
+    holder['key2'] = ctx.target_node.attributes['key2']

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index d199954..fc34907 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -16,6 +16,7 @@
 import pytest
 
 from aria import workflow, operation
+from aria.modeling import models
 from aria.orchestrator import context
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.executor import thread
@@ -93,7 +94,7 @@ def test_host_ip(workflow_context, executor, dataholder):
         operation_kwargs=dict(implementation=op_path(host_ip, module_path=__name__), inputs=inputs)
     )
     dependency_node.interfaces[interface.name] = interface
-    dependency_node.runtime_properties['ip'] = '1.1.1.1'
+    dependency_node.attributes['ip'] = models.Parameter.wrap('ip', '1.1.1.1')
 
     workflow_context.model.node.update(dependency_node)
 
@@ -110,7 +111,7 @@ def test_host_ip(workflow_context, executor, dataholder):
 
     execute(workflow_func=basic_workflow, workflow_context=workflow_context, executor=executor)
 
-    assert dataholder.get('host_ip') == dependency_node.runtime_properties.get('ip')
+    assert dataholder.get('host_ip') == dependency_node.attributes.get('ip').value
 
 
 def test_relationship_tool_belt(workflow_context, executor, dataholder):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 09d0499..d9115e1 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -43,26 +43,26 @@ class TestLocalRunScript(object):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties map.key value
+            ctx node attributes map.key value
             ''',
             windows_script='''
-            ctx node runtime-properties map.key value
+            ctx node attributes map.key value
         ''')
         props = self._run(
             executor, workflow_context,
             script_path=script_path)
-        assert props['map']['key'] == 'value'
+        assert props['map'].value['key'] == 'value'
 
     def test_process_env(self, executor, workflow_context, tmpdir):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties map.key1 $key1
-            ctx node runtime-properties map.key2 $key2
+            ctx node attributes map.key1 $key1
+            ctx node attributes map.key2 $key2
             ''',
             windows_script='''
-            ctx node runtime-properties map.key1 %key1%
-            ctx node runtime-properties map.key2 %key2%
+            ctx node attributes map.key1 %key1%
+            ctx node attributes map.key2 %key2%
         ''')
         props = self._run(
             executor, workflow_context,
@@ -73,7 +73,7 @@ class TestLocalRunScript(object):
                     'key2': 'value2'
                 }
             })
-        p_map = props['map']
+        p_map = props['map'].value
         assert p_map['key1'] == 'value1'
         assert p_map['key2'] == 'value2'
 
@@ -81,10 +81,10 @@ class TestLocalRunScript(object):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties map.cwd $PWD
+            ctx node attributes map.cwd $PWD
             ''',
             windows_script='''
-            ctx node runtime-properties map.cwd %CD%
+            ctx node attributes map.cwd %CD%
             ''')
         tmpdir = str(tmpdir)
         props = self._run(
@@ -93,11 +93,11 @@ class TestLocalRunScript(object):
             process={
                 'cwd': tmpdir
             })
-        p_map = props['map']
+        p_map = props['map'].value
         assert p_map['cwd'] == tmpdir
 
     def test_process_command_prefix(self, executor, workflow_context, tmpdir):
-        use_ctx = 'ctx node runtime-properties map.key value'
+        use_ctx = 'ctx node attributes map.key value'
         python_script = ['import subprocess',
                          'subprocess.Popen("{0}".split(' ')).communicate()[0]'.format(use_ctx)]
         python_script = '\n'.join(python_script)
@@ -114,19 +114,19 @@ class TestLocalRunScript(object):
                 'env': {'TEST_KEY': 'value'},
                 'command_prefix': 'python'
             })
-        p_map = props['map']
+        p_map = props['map'].value
         assert p_map['key'] == 'value'
 
     def test_process_args(self, executor, workflow_context, tmpdir):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties map.arg1 "$1"
-            ctx node runtime-properties map.arg2 $2
+            ctx node attributes map.arg1 "$1"
+            ctx node attributes map.arg2 $2
             ''',
             windows_script='''
-            ctx node runtime-properties map.arg1 %1
-            ctx node runtime-properties map.arg2 %2
+            ctx node attributes map.arg1 %1
+            ctx node attributes map.arg2 %2
             ''')
         props = self._run(
             executor, workflow_context,
@@ -134,8 +134,8 @@ class TestLocalRunScript(object):
             process={
                 'args': ['"arg with spaces"', 'arg2']
             })
-        assert props['map']['arg1'] == 'arg with spaces'
-        assert props['map']['arg2'] == 'arg2'
+        assert props['map'].value['arg1'] == 'arg with spaces'
+        assert props['map'].value['arg2'] == 'arg2'
 
     def test_no_script_path(self, executor, workflow_context):
         exception = self._run_and_get_task_exception(
@@ -187,7 +187,7 @@ class TestLocalRunScript(object):
         script = '''
 from aria.orchestrator.execution_plugin import ctx, inputs
 if __name__ == '__main__':
-    ctx.node.runtime_properties['key'] = inputs['key']
+    ctx.node.attributes['key'] = inputs['key']
 '''
         suffix = '.py'
         script_path = self._create_script(
@@ -200,7 +200,7 @@ if __name__ == '__main__':
             executor, workflow_context,
             script_path=script_path,
             inputs={'key': 'value'})
-        assert props['key'] == 'value'
+        assert props['key'].value == 'value'
 
     @pytest.mark.parametrize(
         'value', ['string-value', [1, 2, 3], 999, 3.14, False,
@@ -209,16 +209,17 @@ if __name__ == '__main__':
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties key "${input_as_env_var}"
+            ctx node attributes key "${input_as_env_var}"
             ''',
             windows_script='''
-            ctx node runtime-properties key "%input_as_env_var%"
+            ctx node attributes key "%input_as_env_var%"
         ''')
         props = self._run(
             executor, workflow_context,
             script_path=script_path,
             env_var=value)
-        expected = props['key'] if isinstance(value, basestring) else json.loads(props['key'])
+        value = props['key'].value
+        expected = value if isinstance(value, basestring) else json.loads(value)
         assert expected == value
 
     @pytest.mark.parametrize('value', ['override', {'key': 'value'}])
@@ -227,10 +228,10 @@ if __name__ == '__main__':
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties key "${input_as_env_var}"
+            ctx node attributes key "${input_as_env_var}"
             ''',
             windows_script='''
-            ctx node runtime-properties key "%input_as_env_var%"
+            ctx node attributes key "%input_as_env_var%"
         ''')
 
         props = self._run(
@@ -242,17 +243,18 @@ if __name__ == '__main__':
                     'input_as_env_var': value
                 }
             })
-        expected = props['key'] if isinstance(value, basestring) else json.loads(props['key'])
+        value = props['key'].value
+        expected = value if isinstance(value, basestring) else json.loads(value)
         assert expected == value
 
     def test_get_nonexistent_runtime_property(self, executor, workflow_context, tmpdir):
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx node runtime-properties nonexistent
+            ctx node attributes nonexistent
             ''',
             windows_script='''
-            ctx node runtime-properties nonexistent
+            ctx node attributes nonexistent
         ''')
         exception = self._run_and_get_task_exception(
             executor, workflow_context,
@@ -266,10 +268,10 @@ if __name__ == '__main__':
         script_path = self._create_script(
             tmpdir,
             linux_script='''#! /bin/bash -e
-            ctx -j instance runtime-properties nonexistent
+            ctx -j instance attributes nonexistent
             ''',
             windows_script='''
-            ctx -j instance runtime-properties nonexistent
+            ctx -j instance attributes nonexistent
             ''')
         exception = self._run_and_get_task_exception(
             executor, workflow_context,
@@ -502,7 +504,7 @@ if __name__ == '__main__':
             tasks_graph=tasks_graph)
         eng.execute()
         return workflow_context.model.node.get_by_name(
-            mock.models.DEPENDENCY_NODE_NAME).runtime_properties
+            mock.models.DEPENDENCY_NODE_NAME).attributes
 
     @pytest.fixture
     def executor(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/execution_plugin/test_ssh.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_ssh.py b/tests/orchestrator/execution_plugin/test_ssh.py
index a9dc5e8..92d250e 100644
--- a/tests/orchestrator/execution_plugin/test_ssh.py
+++ b/tests/orchestrator/execution_plugin/test_ssh.py
@@ -53,9 +53,9 @@ _FABRIC_ENV = {
 class TestWithActualSSHServer(object):
 
     def test_run_script_basic(self):
-        expected_runtime_property_value = 'some_value'
-        props = self._execute(env={'test_value': expected_runtime_property_value})
-        assert props['test_value'] == expected_runtime_property_value
+        expected_attribute_value = 'some_value'
+        props = self._execute(env={'test_value': expected_attribute_value})
+        assert props['test_value'].value == expected_attribute_value
 
     @pytest.mark.skip(reason='sudo privileges are required')
     def test_run_script_as_sudo(self):
@@ -66,7 +66,7 @@ class TestWithActualSSHServer(object):
 
     def test_run_script_default_base_dir(self):
         props = self._execute()
-        assert props['work_dir'] == '{0}/work'.format(constants.DEFAULT_BASE_DIR)
+        assert props['work_dir'].value == '{0}/work'.format(constants.DEFAULT_BASE_DIR)
 
     @pytest.mark.skip(reason='Re-enable once output from process executor can be captured')
     @pytest.mark.parametrize('hide_groups', [[], ['everything']])
@@ -93,16 +93,16 @@ class TestWithActualSSHServer(object):
                 'cwd': expected_cwd,
                 'base_dir': expected_base_dir
             })
-        assert props['env_value'] == expected_env_value
-        assert len(props['bash_version']) > 0
-        assert props['arg1_value'] == expected_arg1_value
-        assert props['arg2_value'] == expected_arg2_value
-        assert props['cwd'] == expected_cwd
-        assert props['ctx_path'] == '{0}/ctx'.format(expected_base_dir)
+        assert props['env_value'].value == expected_env_value
+        assert len(props['bash_version'].value) > 0
+        assert props['arg1_value'].value == expected_arg1_value
+        assert props['arg2_value'].value == expected_arg2_value
+        assert props['cwd'].value == expected_cwd
+        assert props['ctx_path'].value == '{0}/ctx'.format(expected_base_dir)
 
     def test_run_script_command_prefix(self):
         props = self._execute(process={'command_prefix': 'bash -i'})
-        assert 'i' in props['dollar_dash']
+        assert 'i' in props['dollar_dash'].value
 
     def test_run_script_reuse_existing_ctx(self):
         expected_test_value_1 = 'test_value_1'
@@ -112,27 +112,27 @@ class TestWithActualSSHServer(object):
                              '{0}_2'.format(self.test_name)],
             env={'test_value1': expected_test_value_1,
                  'test_value2': expected_test_value_2})
-        assert props['test_value1'] == expected_test_value_1
-        assert props['test_value2'] == expected_test_value_2
+        assert props['test_value1'].value == expected_test_value_1
+        assert props['test_value2'].value == expected_test_value_2
 
     def test_run_script_download_resource_plain(self, tmpdir):
         resource = tmpdir.join('resource')
         resource.write('content')
         self._upload(str(resource), 'test_resource')
         props = self._execute()
-        assert props['test_value'] == 'content'
+        assert props['test_value'].value == 'content'
 
     def test_run_script_download_resource_and_render(self, tmpdir):
         resource = tmpdir.join('resource')
         resource.write('{{ctx.service.name}}')
         self._upload(str(resource), 'test_resource')
         props = self._execute()
-        assert props['test_value'] == self._workflow_context.service.name
+        assert props['test_value'].value == self._workflow_context.service.name
 
     @pytest.mark.parametrize('value', ['string-value', [1, 2, 3], {'key': 'value'}])
     def test_run_script_inputs_as_env_variables_no_override(self, value):
         props = self._execute(custom_input=value)
-        return_value = props['test_value']
+        return_value = props['test_value'].value
         expected = return_value if isinstance(value, basestring) else json.loads(return_value)
         assert value == expected
 
@@ -140,7 +140,7 @@ class TestWithActualSSHServer(object):
     def test_run_script_inputs_as_env_variables_process_env_override(self, value):
         props = self._execute(custom_input='custom-input-value',
                               env={'custom_env_var': value})
-        return_value = props['test_value']
+        return_value = props['test_value'].value
         expected = return_value if isinstance(value, basestring) else json.loads(return_value)
         assert value == expected
 
@@ -260,7 +260,7 @@ class TestWithActualSSHServer(object):
             tasks_graph=tasks_graph)
         eng.execute()
         return self._workflow_context.model.node.get_by_name(
-            mock.models.DEPENDENCY_NODE_NAME).runtime_properties
+            mock.models.DEPENDENCY_NODE_NAME).attributes
 
     def _execute_and_get_task_exception(self, *args, **kwargs):
         signal = events.on_failure_task_signal

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/workflows/core/test_task.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py
index 50ca7f5..e488933 100644
--- a/tests/orchestrator/workflows/core/test_task.py
+++ b/tests/orchestrator/workflows/core/test_task.py
@@ -100,7 +100,7 @@ class TestOperationTask(object):
         storage_task = ctx.model.task.get_by_name(core_task.name)
         assert storage_task.plugin is storage_plugin
         assert storage_task.execution_name == ctx.execution.name
-        assert storage_task.actor == core_task.context.node
+        assert storage_task.actor == core_task.context.node._original_model
         assert core_task.model_task == storage_task
         assert core_task.name == api_task.name
         assert core_task.implementation == api_task.implementation

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/__init__.py b/tests/orchestrator/workflows/executor/__init__.py
index 375c44e..41c4b2e 100644
--- a/tests/orchestrator/workflows/executor/__init__.py
+++ b/tests/orchestrator/workflows/executor/__init__.py
@@ -74,3 +74,7 @@ class MockContext(object):
             return cls(storage=aria.application_model_storage(**kwargs))
         else:
             return cls()
+
+    @staticmethod
+    def close():
+        pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
index 1dbfae1..92f0fc4 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_concurrent_modifications.py
@@ -17,7 +17,6 @@ import time
 import fasteners
 import pytest
 
-from aria.storage.exceptions import StorageError
 from aria.orchestrator import events
 from aria.orchestrator.workflows.exceptions import ExecutorException
 from aria.orchestrator.workflows import api
@@ -29,47 +28,37 @@ from tests.orchestrator.context import execute as execute_workflow
 from tests.orchestrator.workflows.helpers import events_collector
 from tests import mock
 from tests import storage
+from tests import helpers
 
 
-def test_concurrent_modification_on_task_succeeded(context, executor, lock_files):
-    _test(context, executor, lock_files, _test_task_succeeded, expected_failure=True)
-
-
-@operation
-def _test_task_succeeded(ctx, lock_files, key, first_value, second_value):
-    _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
+@pytest.fixture
+def dataholder(tmpdir):
+    dataholder_path = str(tmpdir.join('dataholder'))
+    holder = helpers.FilesystemDataHolder(dataholder_path)
+    return holder
 
 
-def test_concurrent_modification_on_task_failed(context, executor, lock_files):
-    _test(context, executor, lock_files, _test_task_failed, expected_failure=True)
+def test_concurrent_modification_on_task_succeeded(context, executor, lock_files, dataholder):
+    _test(context, executor, lock_files, _test_task_succeeded, dataholder, expected_failure=False)
 
 
 @operation
-def _test_task_failed(ctx, lock_files, key, first_value, second_value):
-    first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value)
-    if not first:
-        raise RuntimeError('MESSAGE')
+def _test_task_succeeded(ctx, lock_files, key, first_value, second_value, holder_path):
+    _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
 
 
-def test_concurrent_modification_on_update_and_refresh(context, executor, lock_files):
-    _test(context, executor, lock_files, _test_update_and_refresh, expected_failure=False)
+def test_concurrent_modification_on_task_failed(context, executor, lock_files, dataholder):
+    _test(context, executor, lock_files, _test_task_failed, dataholder, expected_failure=True)
 
 
 @operation
-def _test_update_and_refresh(ctx, lock_files, key, first_value, second_value):
-    node = ctx.node
-    first = _concurrent_update(lock_files, node, key, first_value, second_value)
+def _test_task_failed(ctx, lock_files, key, first_value, second_value, holder_path):
+    first = _concurrent_update(lock_files, ctx.node, key, first_value, second_value, holder_path)
     if not first:
-        try:
-            ctx.model.node.update(node)
-        except StorageError as e:
-            assert 'Version conflict' in str(e)
-            ctx.model.node.refresh(node)
-        else:
-            raise RuntimeError('Unexpected')
+        raise RuntimeError('MESSAGE')
 
 
-def _test(context, executor, lock_files, func, expected_failure):
+def _test(context, executor, lock_files, func, dataholder, expected_failure):
     def _node(ctx):
         return ctx.model.node.get_by_name(mock.models.DEPENDENCY_NODE_NAME)
 
@@ -82,7 +71,8 @@ def _test(context, executor, lock_files, func, expected_failure):
         'lock_files': lock_files,
         'key': key,
         'first_value': first_value,
-        'second_value': second_value
+        'second_value': second_value,
+        'holder_path': dataholder.path
     }
 
     node = _node(context)
@@ -118,17 +108,13 @@ def _test(context, executor, lock_files, func, expected_failure):
         except ExecutorException:
             pass
 
-    props = _node(context).runtime_properties
-    assert props[key] == first_value
+    props = _node(context).attributes
+    assert dataholder['invocations'] == 2
+    assert props[key].value == dataholder[key]
 
     exceptions = [event['kwargs']['exception'] for event in collected.get(signal, [])]
     if expected_failure:
         assert exceptions
-        exception = exceptions[-1]
-        assert isinstance(exception, StorageError)
-        assert 'Version conflict' in str(exception)
-    else:
-        assert not exceptions
 
 
 @pytest.fixture
@@ -150,8 +136,8 @@ def lock_files(tmpdir):
     return str(tmpdir.join('first_lock_file')), str(tmpdir.join('second_lock_file'))
 
 
-def _concurrent_update(lock_files, node, key, first_value, second_value):
-
+def _concurrent_update(lock_files, node, key, first_value, second_value, holder_path):
+    holder = helpers.FilesystemDataHolder(holder_path)
     locker1 = fasteners.InterProcessLock(lock_files[0])
     locker2 = fasteners.InterProcessLock(lock_files[1])
 
@@ -161,11 +147,14 @@ def _concurrent_update(lock_files, node, key, first_value, second_value):
         # Give chance for both processes to acquire locks
         while locker2.acquire(blocking=False):
             locker2.release()
-            time.sleep(0.01)
+            time.sleep(0.1)
     else:
         locker2.acquire()
 
-    node.runtime_properties[key] = first_value if first else second_value
+    node.attributes[key] = first_value if first else second_value
+    holder['key'] = first_value if first else second_value
+    holder.setdefault('invocations', 0)
+    holder['invocations'] += 1
 
     if first:
         locker1.release()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/50b997e3/tests/orchestrator/workflows/executor/test_process_executor_extension.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_process_executor_extension.py b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
index 878ac24..30b23ed 100644
--- a/tests/orchestrator/workflows/executor/test_process_executor_extension.py
+++ b/tests/orchestrator/workflows/executor/test_process_executor_extension.py
@@ -56,7 +56,7 @@ def test_decorate_extension(context, executor):
     graph = mock_workflow(ctx=context)  # pylint: disable=no-value-for-parameter
     eng = engine.Engine(executor=executor, workflow_context=context, tasks_graph=graph)
     eng.execute()
-    out = get_node(context).runtime_properties['out']
+    out = get_node(context).attributes.get('out').value
     assert out['wrapper_inputs'] == inputs
     assert out['function_inputs'] == inputs
 
@@ -67,7 +67,7 @@ class MockProcessExecutorExtension(object):
     def decorate(self):
         def decorator(function):
             def wrapper(ctx, **operation_inputs):
-                ctx.node.runtime_properties['out'] = {'wrapper_inputs': operation_inputs}
+                ctx.node.attributes['out'] = {'wrapper_inputs': operation_inputs}
                 function(ctx=ctx, **operation_inputs)
             return wrapper
         return decorator
@@ -75,7 +75,7 @@ class MockProcessExecutorExtension(object):
 
 @operation
 def _mock_operation(ctx, **operation_inputs):
-    ctx.node.runtime_properties['out']['function_inputs'] = operation_inputs
+    ctx.node.attributes['out']['function_inputs'] = operation_inputs
 
 
 @pytest.fixture


Mime
View raw message