ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject incubator-ariatosca git commit: removed sleep, and added some complexity to the caching mechanism [Forced Update!]
Date Sun, 12 Mar 2017 16:54:39 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-117-Log-model-should-have-an-Task-field 100957214 -> c32d05194 (forced
update)


removed sleep, and added some complexity to the caching mechanism


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/c32d0519
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/c32d0519
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/c32d0519

Branch: refs/heads/ARIA-117-Log-model-should-have-an-Task-field
Commit: c32d051946c96c9da93aebb512fce179c9657e1d
Parents: 62f0141
Author: max-orlov <maxim@gigaspaces.com>
Authored: Sun Mar 12 16:32:08 2017 +0200
Committer: max-orlov <maxim@gigaspaces.com>
Committed: Sun Mar 12 18:54:32 2017 +0200

----------------------------------------------------------------------
 aria/orchestrator/context/operation.py                | 14 ++++++++++----
 aria/orchestrator/execution_plugin/common.py          |  6 +++---
 .../orchestrator/execution_plugin/ctx_proxy/server.py |  8 +++++++-
 aria/orchestrator/execution_plugin/local.py           |  1 -
 aria/orchestrator/workflows/executor/thread.py        |  4 ----
 tests/orchestrator/execution_plugin/test_local.py     |  7 +++++--
 6 files changed, 25 insertions(+), 15 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c32d0519/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index ed0791c..d2716e8 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -17,6 +17,8 @@
 Workflow and operation contexts
 """
 
+import threading
+
 import aria
 from aria.utils import file
 from .common import BaseContext
@@ -44,7 +46,7 @@ class BaseOperationContext(BaseContext):
             **kwargs)
         self._task_id = task_id
         self._actor_id = actor_id
-        self._task = None
+        self._thread_local = threading.local()
         self._execution_id = execution_id
         self._register_logger(task_id=self.task.id)
 
@@ -64,9 +66,13 @@ class BaseOperationContext(BaseContext):
         The task in the model storage
         :return: Task model
         """
-        if not self._task:
-            self._task = self.model.task.get(self._task_id)
-        return self._task
+        # SQLAlchemy prevents from accessing an object which was created on a different thread.
+        # So we retrieve the object from the storage if the current thread isn't the same
as the
+        # original thread.
+
+        if not hasattr(self._thread_local, 'task'):
+            self._thread_local.task = self.model.task.get(self._task_id)
+        return self._thread_local.task
 
     @property
     def plugin_workdir(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c32d0519/aria/orchestrator/execution_plugin/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/common.py b/aria/orchestrator/execution_plugin/common.py
index 47cb631..7d33f2c 100644
--- a/aria/orchestrator/execution_plugin/common.py
+++ b/aria/orchestrator/execution_plugin/common.py
@@ -133,14 +133,14 @@ def check_error(ctx, error_check_func=None, reraise=False):
     _error = ctx._error
     # this happens when a script calls task.abort/task.retry more than once
     if isinstance(_error, RuntimeError):
-        ctx.task._original_abort(str(_error))
+        ctx.task.abort(str(_error))
     # ScriptException is populated by the ctx proxy server when task.abort or task.retry
     # are called
     elif isinstance(_error, exceptions.ScriptException):
         if _error.retry:
-            ctx.task._original_retry(_error.message, _error.retry_interval)
+            ctx.task.retry(_error.message, _error.retry_interval)
         else:
-            ctx.task._original_abort(_error.message)
+            ctx.task.abort(_error.message)
     # local and ssh operations may pass an additional logic check for errors here
     if error_check_func:
         error_check_func()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c32d0519/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 2782ae3..5423cbd 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -25,7 +25,10 @@ import wsgiref.simple_server
 
 import bottle
 
-from .. import exceptions
+from .. import (
+    exceptions,
+    common
+)
 
 
 class CtxProxy(object):
@@ -69,6 +72,9 @@ class CtxProxy(object):
                 server.serve_forever(poll_interval=0.1)
 
         def serve():
+            # Since task is a thread_local object, we need to patch it inside the server
thread.
+            common.patch_ctx(self.ctx)
+
             bottle_app = bottle.Bottle()
             bottle_app.post('/', callback=self._request_handler)
             bottle.run(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c32d0519/aria/orchestrator/execution_plugin/local.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/local.py b/aria/orchestrator/execution_plugin/local.py
index bc2d661..2101d21 100644
--- a/aria/orchestrator/execution_plugin/local.py
+++ b/aria/orchestrator/execution_plugin/local.py
@@ -75,7 +75,6 @@ def _execute_func(script_path, ctx, process, operation_kwargs):
     env = os.environ.copy()
     env.update(process['env'])
     ctx.logger.info('Executing: {0}'.format(command))
-    common.patch_ctx(ctx)
     with ctx_proxy.server.CtxProxy(ctx) as proxy:
         env[ctx_proxy.client.CTX_SOCKET_URL] = proxy.socket_url
         running_process = subprocess.Popen(

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c32d0519/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 16b22e3..6c59986 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -17,7 +17,6 @@
 Thread based executor
 """
 
-import time
 import Queue
 import threading
 
@@ -59,9 +58,6 @@ class ThreadExecutor(BaseExecutor):
                 self._task_started(task)
                 try:
                     task_func = imports.load_attribute(task.implementation)
-                    # Some of the changes (mainly the logs fail to propagate if not enough
time
-                    # is given
-                    time.sleep(0.1)
                     task_func(ctx=task.context, **task.inputs)
                     self._task_succeeded(task)
                 except BaseException as e:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c32d0519/tests/orchestrator/execution_plugin/test_local.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/execution_plugin/test_local.py b/tests/orchestrator/execution_plugin/test_local.py
index 5224496..a3dc8d9 100644
--- a/tests/orchestrator/execution_plugin/test_local.py
+++ b/tests/orchestrator/execution_plugin/test_local.py
@@ -18,7 +18,10 @@ import os
 
 import pytest
 
-from aria import workflow
+from aria import (
+    workflow,
+    storage
+)
 from aria.orchestrator import events
 from aria.orchestrator.workflows import api
 from aria.orchestrator.workflows.exceptions import ExecutorException
@@ -30,7 +33,7 @@ from aria.orchestrator.execution_plugin import constants
 from aria.orchestrator.workflows.executor import process
 from aria.orchestrator.workflows.core import engine
 
-from tests import mock, storage
+from tests import mock
 from tests.orchestrator.workflows.helpers import events_collector
 
 IS_WINDOWS = os.name == 'nt'


Mime
View raw message