ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: code review 1
Date Thu, 11 May 2017 13:45:39 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue a37af2a8f -> 9d0f37ec5


code review 1


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/9d0f37ec
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/9d0f37ec
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/9d0f37ec

Branch: refs/heads/ARIA-213-Sporadic-tests-failures-over-locked-database-issue
Commit: 9d0f37ec569585206eee8a99756b02688e6f6527
Parents: a37af2a
Author: max-orlov <maxim@gigaspaces.com>
Authored: Thu May 11 16:45:35 2017 +0300
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Thu May 11 16:45:35 2017 +0300

----------------------------------------------------------------------
 aria/logger.py                                  |  7 +--
 aria/orchestrator/workflows/executor/process.py |  7 ++-
 aria/storage/instrumentation.py                 | 59 +++++++++++---------
 .../workflows/executor/test_executor.py         |  2 +-
 tests/storage/test_instrumentation.py           |  8 +--
 5 files changed, 41 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/aria/logger.py
----------------------------------------------------------------------
diff --git a/aria/logger.py b/aria/logger.py
index 9214bd9..bd7ed4e 100644
--- a/aria/logger.py
+++ b/aria/logger.py
@@ -118,12 +118,7 @@ def create_sqla_log_handler(model, log_cls, execution_id, level=logging.DEBUG):
 
     # This is needed since the engine and session are entirely new we need to reflect the
db
     # schema of the logging model into the engine and session.
-    log_cls.__table__.create(bind=model.log._engine, checkfirst=True)
-
-    return _SQLAlchemyHandler(model=model,
-                              log_cls=log_cls,
-                              execution_id=execution_id,
-                              level=level)
+    return _SQLAlchemyHandler(model=model, log_cls=log_cls, execution_id=execution_id, level=level)
 
 
 class _DefaultConsoleFormat(logging.Formatter):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index e8cf019..824c4e1 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -340,7 +340,7 @@ def _patch_ctx(ctx, messenger, instrument):
 
     def patched_rollback():
         # Rollback is performed on parent process when commit fails
-        pass
+        instrument.expunge_session()
 
     # when autoflush is set to true (the default), refreshing an object will trigger
     # an auto flush by sqlalchemy, this autoflush will attempt to commit changes made so
@@ -378,7 +378,7 @@ def _main():
         messenger.failed(exception=e, tracked_changes=None, new_instances=None)
         return
 
-    with instrumentation.track_changes(ctx) as instrument:
+    with instrumentation.track_changes(ctx.model) as instrument:
         try:
             messenger.started()
             _patch_ctx(ctx=ctx, messenger=messenger, instrument=instrument)
@@ -393,7 +393,8 @@ def _main():
             messenger.failed(exception=e,
                              tracked_changes=instrument.tracked_changes,
                              new_instances=instrument.new_instances)
-
+        finally:
+            instrument.expunge_session()
 
 if __name__ == '__main__':
     _main()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 6af9473..23f6fd0 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -14,6 +14,8 @@
 # limitations under the License.
 
 import copy
+import json
+import os
 
 import sqlalchemy.event
 
@@ -36,7 +38,7 @@ _INSTRUMENTED = {
 _NEW_INSTANCE = 'NEW_INSTANCE'
 
 
-def track_changes(ctx=None, instrumented=None):
+def track_changes(model=None, instrumented=None):
     """Track changes in the specified model columns
 
     This call will register event listeners using sqlalchemy's event mechanism. The listeners
@@ -55,41 +57,43 @@ def track_changes(ctx=None, instrumented=None):
     the instrumentation context returned from this call, to the parent process. The parent
process
     will then call ``apply_tracked_changes()`` that resides in this module as well.
     At that point, the changes will actually be written back to the database.
-
+    
+    :param model: the model storage. it should hold a mapi for each model. the session of
each mapi
+    is needed to setup events
     :param instrumented: A dict from model columns to their python native type
     :return: The instrumentation context
     """
-    return _Instrumentation(ctx, instrumented or _INSTRUMENTED)
+    return _Instrumentation(model, instrumented or _INSTRUMENTED)
 
 
 class _Instrumentation(object):
 
-    def __init__(self, ctx, instrumented):
+    def __init__(self, model, instrumented):
         self.tracked_changes = {}
         self.new_instances = {}
         self.listeners = []
         self._new_modeled_instances = []
-        self._ctx = ctx
+        self._model = model
         self._track_changes(instrumented)
-        self._new_instance_index = 0
 
     @property
     def _new_instance_id(self):
-        self._new_instance_index += 1
-        return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE, index=self._new_instance_index
- 1)
+        return '{prefix}_{index}'.format(prefix=_NEW_INSTANCE,
+                                         index=len(self._new_modeled_instances))
 
     def expunge_session(self):
         for new_instance in self._new_modeled_instances:
-            self._get_session_from_ctx(new_instance.__tablename__).expunge(new_instance)
+            self._get_session_from_model(new_instance.__tablename__).expunge(new_instance)
 
-    def _get_session_from_ctx(self, tablename):
-        mapi = getattr(self._ctx.model, tablename, None)
+    def _get_session_from_model(self, tablename):
+        mapi = getattr(self._model, tablename, None)
         if mapi:
             return mapi._session
         raise StorageError("Could not retrieve session for {0}".format(tablename))
 
     def _track_changes(self, instrumented):
         instrumented_attribute_classes = {}
+        # Track any newly-set attributes.
         for instrumented_attribute, attribute_type in instrumented.get('modified', {}).items():
             self._register_set_attribute_listener(
                 instrumented_attribute=instrumented_attribute,
@@ -98,25 +102,24 @@ class _Instrumentation(object):
             instrumented_class_attributes = instrumented_attribute_classes.setdefault(
                 instrumented_class, {})
             instrumented_class_attributes[instrumented_attribute.key] = attribute_type
+
+        # Track any global instance update such as 'refresh' or 'load'
         for instrumented_class, instrumented_attributes in instrumented_attribute_classes.items():
-            self._register_instance_attribute_listeners(
-                instrumented_class=instrumented_class,
-                instrumented_attributes=instrumented_attributes)
+            self._register_instance_listeners(instrumented_class=instrumented_class,
+                                              instrumented_attributes=instrumented_attributes)
 
-        # instrument creation of new instances
+        # Track any newly created instances.
         for instrumented_class in instrumented.get('new', {}):
-            self._register_instance_listener(instrumented_class)
+            self._register_new_instance_listener(instrumented_class)
 
-    def _register_instance_listener(self, instrumented_class):
-        if self._ctx is None:
+    def _register_new_instance_listener(self, instrumented_class):
+        if self._model is None:
             if instrumented_class:
                 raise StorageError("In order to keep track of new instances, a ctx is needed")
             else:
                 return
 
         def listener(_, instance):
-            if not isinstance(instance, instrumented_class):
-                return
             self._new_modeled_instances.append(instance)
             tracked_instances = self.new_instances.setdefault(instance.__modelname__, {})
             tracked_attributes = tracked_instances.setdefault(self._new_instance_id, {})
@@ -124,7 +127,7 @@ class _Instrumentation(object):
             instance_as_dict.update((k, getattr(instance, k))
                                     for k in getattr(instance, '__private_fields__', []))
             tracked_attributes.update(instance_as_dict)
-        session = self._get_session_from_ctx(instrumented_class.__tablename__)
+        session = self._get_session_from_model(instrumented_class.__tablename__)
         listener_args = (session, 'after_attach', listener)
         sqlalchemy.event.listen(*listener_args)
         self.listeners.append(listener_args)
@@ -144,7 +147,7 @@ class _Instrumentation(object):
         sqlalchemy.event.listen(*listener_args, retval=True)
         self.listeners.append(listener_args)
 
-    def _register_instance_attribute_listeners(self, instrumented_class, instrumented_attributes):
+    def _register_instance_listeners(self, instrumented_class, instrumented_attributes):
         def listener(target, *_):
             mapi_name = instrumented_class.__modelname__
             tracked_instances = self.tracked_changes.setdefault(mapi_name, {})
@@ -224,6 +227,7 @@ def apply_tracked_changes(tracked_changes, new_instances, model):
     """
     successfully_updated_changes = dict()
     try:
+        # handle instance updates
         for mapi_name, tracked_instances in tracked_changes.items():
             successfully_updated_changes[mapi_name] = dict()
             mapi = getattr(model, mapi_name)
@@ -240,10 +244,11 @@ def apply_tracked_changes(tracked_changes, new_instances, model):
                     successfully_updated_changes[mapi_name][instance_id] = [
                         v.dict for v in tracked_attributes.values()]
 
+        # Handle new instances
         for mapi_name, new_instance in new_instances.items():
             successfully_updated_changes[mapi_name] = dict()
             mapi = getattr(model, mapi_name)
-            for new_instance_kwargs in sorted(new_instance.values()):
+            for new_instance_kwargs in new_instance.values():
                 instance = mapi.model_cls(**new_instance_kwargs)
                 mapi.put(instance)
                 successfully_updated_changes[mapi_name][instance.id] = new_instance_kwargs
@@ -252,10 +257,10 @@ def apply_tracked_changes(tracked_changes, new_instances, model):
             if not value:
                 del successfully_updated_changes[key]
         # TODO: if the successful has _STUB, the logging fails because it can't serialize
the object
-        # model.logger.error(
-        #     'Registering all the changes to the storage has failed. {0}'
-        #     'The successful updates were: {0} '
-        #     '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4)))
+        model.logger.error(
+            'Registering all the changes to the storage has failed. {0}'
+            'The successful updates were: {0} '
+            '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4)))
 
         raise
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index 47604e9..29cb0e8 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -55,7 +55,7 @@ def execute_and_assert(executor, storage=None):
 
     @retrying.retry(stop_max_delay=10000, wait_fixed=100)
     def assertion():
-        # assert successful_task.states == ['start', 'success']
+        assert successful_task.states == ['start', 'success']
         assert failing_task.states == ['start', 'failure']
         assert task_with_inputs.states == ['start', 'failure']
         assert isinstance(failing_task.exception, MockException)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/9d0f37ec/tests/storage/test_instrumentation.py
----------------------------------------------------------------------
diff --git a/tests/storage/test_instrumentation.py b/tests/storage/test_instrumentation.py
index 24c4204..cfea963 100644
--- a/tests/storage/test_instrumentation.py
+++ b/tests/storage/test_instrumentation.py
@@ -36,7 +36,6 @@ Value = instrumentation._Value
 instruments_holder = []
 
 
-# TODO: add testing for new instances
 class TestInstrumentation(object):
 
     def test_track_changes(self, storage):
@@ -289,8 +288,7 @@ class TestInstrumentation(object):
         model_instance_1 = MockModel1(**model_kwargs)
         model_instance_2 = MockModel2(**model_kwargs)
 
-        ctx = MockContext(storage)
-        instrument = self._track_changes(ctx=ctx, instrumented_new=(MockModel1, ))
+        instrument = self._track_changes(model=storage, instrumented_new=(MockModel1,))
         assert not instrument.tracked_changes
 
         storage.mock_model_1.put(model_instance_1)
@@ -307,9 +305,9 @@ class TestInstrumentation(object):
         for key in model_kwargs:
             assert mock_model_1[key] == model_kwargs[key] == getattr(storage_model1_instance,
key)
 
-    def _track_changes(self, instrumented_modified=None, ctx=None, instrumented_new=None):
+    def _track_changes(self, instrumented_modified=None, model=None, instrumented_new=None):
         instrument = instrumentation.track_changes(
-            ctx=ctx,
+            model=model,
             instrumented={'modified': instrumented_modified or {}, 'new': instrumented_new
or {}})
         instruments_holder.append(instrument)
         return instrument


Mime
View raw message