ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: removed sleep, and added some complexity to the caching mechanism [Forced Update!]
Date Mon, 13 Mar 2017 10:32:15 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-117-Log-model-should-have-an-Task-field 90a4f3c48 -> a95e2f9fe (forced
update)


removed sleep, and added some complexity to the caching mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/a95e2f9f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/a95e2f9f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/a95e2f9f

Branch: refs/heads/ARIA-117-Log-model-should-have-an-Task-field
Commit: a95e2f9fe83fd751ba3f95717d14bf9009278387
Parents: 62f0141
Author: max-orlov <maxim@gigaspaces.com>
Authored: Sun Mar 12 16:32:08 2017 +0200
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Mon Mar 13 12:32:09 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/context/operation.py                | 14 ++++++++++----
 aria/orchestrator/execution_plugin/common.py          |  8 +++-----
 .../orchestrator/execution_plugin/ctx_proxy/server.py |  6 +++++-
 aria/orchestrator/execution_plugin/local.py           |  3 +--
 aria/orchestrator/execution_plugin/ssh/operations.py  |  3 +--
 aria/orchestrator/workflows/executor/thread.py        |  4 ----
 tests/orchestrator/execution_plugin/test_local.py     |  3 ++-
 7 files changed, 22 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index ed0791c..d2716e8 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -17,6 +17,8 @@
 Workflow and operation contexts
 """
 
+import threading
+
 import aria
 from aria.utils import file
 from .common import BaseContext
@@ -44,7 +46,7 @@ class BaseOperationContext(BaseContext):
             **kwargs)
         self._task_id = task_id
         self._actor_id = actor_id
-        self._task = None
+        self._thread_local = threading.local()
         self._execution_id = execution_id
         self._register_logger(task_id=self.task.id)
 
@@ -64,9 +66,13 @@ class BaseOperationContext(BaseContext):
         The task in the model storage
         :return: Task model
         """
-        if not self._task:
-            self._task = self.model.task.get(self._task_id)
-        return self._task
+        # SQLAlchemy prevents from accessing an object which was created on a different thread.
+        # So we retrieve the object from the storage if the current thread isn't the same
as the
+        # original thread.
+
+        if not hasattr(self._thread_local, 'task'):
+            self._thread_local.task = self.model.task.get(self._task_id)
+        return self._thread_local.task
 
     @property
     def plugin_workdir(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/execution_plugin/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/common.py b/aria/orchestrator/execution_plugin/common.py
index 47cb631..7915c47 100644
--- a/aria/orchestrator/execution_plugin/common.py
+++ b/aria/orchestrator/execution_plugin/common.py
@@ -106,8 +106,6 @@ def create_process_config(script_path, process, operation_kwargs, quote_json_env
 def patch_ctx(ctx):
     ctx._error = None
     task = ctx.task
-    task._original_abort = task.abort
-    task._original_retry = task.retry
 
     def _validate_legal_action():
         if ctx._error is not None:
@@ -133,14 +131,14 @@ def check_error(ctx, error_check_func=None, reraise=False):
     _error = ctx._error
     # this happens when a script calls task.abort/task.retry more than once
     if isinstance(_error, RuntimeError):
-        ctx.task._original_abort(str(_error))
+        ctx.task.abort(str(_error))
     # ScriptException is populated by the ctx proxy server when task.abort or task.retry
     # are called
     elif isinstance(_error, exceptions.ScriptException):
         if _error.retry:
-            ctx.task._original_retry(_error.message, _error.retry_interval)
+            ctx.task.retry(_error.message, _error.retry_interval)
         else:
-            ctx.task._original_abort(_error.message)
+            ctx.task.abort(_error.message)
     # local and ssh operations may pass an additional logic check for errors here
     if error_check_func:
         error_check_func()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 2782ae3..817d064 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -30,8 +30,9 @@ from .. import exceptions
 
 class CtxProxy(object):
 
-    def __init__(self, ctx):
+    def __init__(self, ctx, ctx_patcher=(lambda *args, **kwargs: None)):
         self.ctx = ctx
+        self._ctx_patcher = ctx_patcher
         self.port = _get_unused_port()
         self.socket_url = 'http://localhost:{0}'.format(self.port)
         self.server = None
@@ -69,6 +70,9 @@ class CtxProxy(object):
                 server.serve_forever(poll_interval=0.1)
 
         def serve():
+            # Since task is a thread_local object, we need to patch it inside the server
thread.
+            self._ctx_patcher(self.ctx)
+
             bottle_app = bottle.Bottle()
             bottle_app.post('/', callback=self._request_handler)
             bottle.run(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/execution_plugin/local.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/local.py b/aria/orchestrator/execution_plugin/local.py
index bc2d661..121e582 100644
--- a/aria/orchestrator/execution_plugin/local.py
+++ b/aria/orchestrator/execution_plugin/local.py
@@ -75,8 +75,7 @@ def _execute_func(script_path, ctx, process, operation_kwargs):
     env = os.environ.copy()
     env.update(process['env'])
     ctx.logger.info('Executing: {0}'.format(command))
-    common.patch_ctx(ctx)
-    with ctx_proxy.server.CtxProxy(ctx) as proxy:
+    with ctx_proxy.server.CtxProxy(ctx, common.patch_ctx) as proxy:
         env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url
         running_process = subprocess.Popen(
             command,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/execution_plugin/ssh/operations.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ssh/operations.py b/aria/orchestrator/execution_plugin/ssh/operations.py
index 7589d42..f240beb 100644
--- a/aria/orchestrator/execution_plugin/ssh/operations.py
+++ b/aria/orchestrator/execution_plugin/ssh/operations.py
@@ -70,14 +70,13 @@ def run_script(ctx, script_path, fabric_env, process, use_sudo, hide_output,
**k
                                                                  paths.remote_work_dir))
             # this file has to be present before using ctx
             fabric.api.put(_PROXY_CLIENT_PATH, paths.remote_ctx_path)
-        _patch_ctx(ctx)
         process = common.create_process_config(
             script_path=paths.remote_script_path,
             process=process,
             operation_kwargs=kwargs,
             quote_json_env_vars=True)
         fabric.api.put(paths.local_script_path, paths.remote_script_path)
-        with ctx_proxy.server.CtxProxy(ctx) as proxy:
+        with ctx_proxy.server.CtxProxy(ctx, _patch_ctx) as proxy:
             local_port = proxy.port
             with fabric.context_managers.cd(process.get('cwd', paths.remote_work_dir)): 
# pylint: disable=not-context-manager
                 with tunnel.remote(ctx, local_port=local_port) as remote_port:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 16b22e3..6c59986 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -17,7 +17,6 @@
 Thread based executor
 """
 
-import time
 import Queue
 import threading
 
@@ -59,9 +58,6 @@ class ThreadExecutor(BaseExecutor):
                 self._task_started(task)
                 try:
                     task_func = imports.load_attribute(task.implementation)
-                    # Some of the changes (mainly the logs fail to propagate if not enough
time
-                    # is given
-                    time.sleep(0.1)
                     task_func(ctx=task.context, **task.inputs)
                     self._task_succeeded(task)
                 except BaseException as e:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/a95e2f9f/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 5224496..a94fc83 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -30,7 +30,8 @@ from aria.orchestrator.execution_plugin import constants
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator.workflows.core import engine
 
-from tests import mock, storage
+from tests import mock
+from tests import storage
 from tests.orchestrator.workflows.helpers import events_collector
 
 IS_WINDOWS = os.name == 'nt'


Mime
View raw message