ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject incubator-ariatosca git commit: ARIA-26 TBD
Date Mon, 28 Nov 2016 14:17:08 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-26-plugin-mechanism [created] ad9cab78d


ARIA-26 TBD


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/ad9cab78
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/ad9cab78
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/ad9cab78

Branch: refs/heads/ARIA-26-plugin-mechanism
Commit: ad9cab78ddaca8c259915497b6c1514daed93a43
Parents: b33c70e
Author: Dan Kilman <dank@gigaspaces.com>
Authored: Sun Nov 27 16:31:29 2016 +0200
Committer: Dan Kilman <dank@gigaspaces.com>
Committed: Mon Nov 28 16:16:14 2016 +0200

----------------------------------------------------------------------
 aria/cli/commands.py                            |  4 +-
 .../orchestrator/workflows/executor/__init__.py |  2 +-
 .../orchestrator/workflows/executor/blocking.py | 36 ------------
 .../workflows/executor/multiprocess.py          | 61 ++++++++++----------
 aria/orchestrator/workflows/executor/thread.py  |  3 +-
 aria/storage/models.py                          |  2 +-
 aria/utils/plugin.py                            | 21 +++++++
 requirements.txt                                |  1 +
 tests/orchestrator/context/test_toolbelt.py     |  1 +
 .../orchestrator/workflows/core/test_engine.py  |  8 +--
 .../workflows/executor/test_executor.py         |  2 -
 11 files changed, 63 insertions(+), 78 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index 57118a7..ab035ba 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -31,7 +31,7 @@ from ..logger import LoggerMixin
 from ..storage import (FileSystemModelDriver, FileSystemResourceDriver)
 from ..orchestrator.context.workflow import WorkflowContext
 from ..orchestrator.workflows.core.engine import Engine
-from ..orchestrator.workflows.executor.thread import ThreadExecutor
+from ..orchestrator.workflows.executor.multiprocess import MultiprocessExecutor
 from ..parser import (DSL_SPECIFICATION_PACKAGES, iter_specifications)
 from ..parser.consumption import (
     ConsumptionContext,
@@ -248,7 +248,7 @@ class ExecuteCommand(BaseCommand):
         )
         workflow_function = self._load_workflow_handler(workflow['operation'])
         tasks_graph = workflow_function(workflow_context, **workflow_context.parameters)
-        executor = ThreadExecutor()
+        executor = MultiprocessExecutor()
         workflow_engine = Engine(executor=executor,
                                  workflow_context=workflow_context,
                                  tasks_graph=tasks_graph)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/aria/orchestrator/workflows/executor/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/__init__.py b/aria/orchestrator/workflows/executor/__init__.py
index 16b6c9b..7b205c1 100644
--- a/aria/orchestrator/workflows/executor/__init__.py
+++ b/aria/orchestrator/workflows/executor/__init__.py
@@ -18,5 +18,5 @@ Executors for task execution
 """
 
 
-from . import blocking, multiprocess, thread
+from . import multiprocess, thread
 from .base import BaseExecutor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/aria/orchestrator/workflows/executor/blocking.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/blocking.py b/aria/orchestrator/workflows/executor/blocking.py
deleted file mode 100644
index 9d3a9ba..0000000
--- a/aria/orchestrator/workflows/executor/blocking.py
+++ /dev/null
@@ -1,36 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Blocking executor
-"""
-
-from aria.utils import imports
-from .base import BaseExecutor
-
-
-class CurrentThreadBlockingExecutor(BaseExecutor):
-    """
-    Executor which runs tasks in the current thread (blocking)
-    """
-
-    def execute(self, task):
-        self._task_started(task)
-        try:
-            task_func = imports.load_attribute(task.operation_mapping)
-            task_func(ctx=task.context, **task.inputs)
-            self._task_succeeded(task)
-        except BaseException as e:
-            self._task_failed(task, exception=e)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/aria/orchestrator/workflows/executor/multiprocess.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/multiprocess.py b/aria/orchestrator/workflows/executor/multiprocess.py
index d770e07..0537381 100644
--- a/aria/orchestrator/workflows/executor/multiprocess.py
+++ b/aria/orchestrator/workflows/executor/multiprocess.py
@@ -17,6 +17,7 @@
 Multiprocess based executor
 """
 
+import collections
 import multiprocessing
 import threading
 
@@ -26,6 +27,11 @@ from aria.utils import imports
 from .base import BaseExecutor
 
 
+_TaskStarted = collections.namedtuple('_TaskStarted', 'task_id')
+_TaskSucceeded = collections.namedtuple('_TaskSucceeded', 'task_id')
+_TaskFailed = collections.namedtuple('_TaskFailed', 'task_id exception')
+
+
 class MultiprocessExecutor(BaseExecutor):
     """
     Executor which runs tasks in a multiprocess environment
@@ -40,59 +46,52 @@ class MultiprocessExecutor(BaseExecutor):
         self._listener_thread = threading.Thread(target=self._listener)
         self._listener_thread.daemon = True
         self._listener_thread.start()
-        self._pool = multiprocessing.Pool(processes=pool_size)
-
-    def execute(self, task):
-        self._tasks[task.id] = task
-        self._pool.apply_async(_multiprocess_handler, args=(
-            self._queue,
-            task.context,
-            task.id,
-            task.operation_mapping,
-            task.inputs))
+        self._pool = multiprocessing.Pool(processes=pool_size, maxtasksperchild=1)
 
     def close(self):
         self._pool.close()
         self._stopped = True
         self._pool.join()
+        self._manager.shutdown()
+        self._manager.join()
         self._listener_thread.join()
 
+    def execute(self, task):
+        self._tasks[task.id] = task
+        self._pool.apply_async(_handler, kwds={
+            'queue': self._queue,
+            'ctx': task.context,
+            'task_id': task.id,
+            'operation_mapping': task.operation_mapping,
+            'operation_inputs': task.inputs
+        })
+
+    def _remove_task(self, task_id):
+        return self._tasks.pop(task_id)
+
     def _listener(self):
         while not self._stopped:
             try:
                 message = self._queue.get(timeout=1)
-                if message.type == 'task_started':
+                if isinstance(message, _TaskStarted):
                     self._task_started(self._tasks[message.task_id])
-                elif message.type == 'task_succeeded':
+                elif isinstance(message, _TaskSucceeded):
                     self._task_succeeded(self._remove_task(message.task_id))
-                elif message.type == 'task_failed':
+                elif isinstance(message, _TaskFailed):
                     self._task_failed(self._remove_task(message.task_id),
                                       exception=jsonpickle.loads(message.exception))
                 else:
-                    # TODO: something
-                    raise RuntimeError()
+                    raise RuntimeError('Invalid state')
             # Daemon threads
             except BaseException:
                 pass
 
-    def _remove_task(self, task_id):
-        return self._tasks.pop(task_id)
-
-
-class _MultiprocessMessage(object):
-
-    def __init__(self, type, task_id, exception=None):
-        self.type = type
-        self.task_id = task_id
-        self.exception = exception
-
 
-def _multiprocess_handler(queue, ctx, task_id, operation_mapping, operation_inputs):
-    queue.put(_MultiprocessMessage(type='task_started', task_id=task_id))
+def _handler(queue, ctx, task_id, operation_mapping, operation_inputs):
+    queue.put(_TaskStarted(task_id))
     try:
         task_func = imports.load_attribute(operation_mapping)
         task_func(ctx=ctx, **operation_inputs)
-        queue.put(_MultiprocessMessage(type='task_succeeded', task_id=task_id))
+        queue.put(_TaskSucceeded(task_id))
     except BaseException as e:
-        queue.put(_MultiprocessMessage(type='task_failed', task_id=task_id,
-                                       exception=jsonpickle.dumps(e)))
+        queue.put(_TaskFailed(task_id, exception=jsonpickle.dumps(e)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 76ceefd..16a58fb 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -26,7 +26,8 @@ from .base import BaseExecutor
 
 class ThreadExecutor(BaseExecutor):
     """
-    Executor which runs tasks in a separate thread
+    Executor which runs tasks in a separate thread. It's easier writing tests
+    using this executor rather than the full blown multiprocessing executor.
     """
 
     def __init__(self, pool_size=1, *args, **kwargs):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index d24ad75..e587b94 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -441,4 +441,4 @@ class Task(Model):
     name = Field(type=basestring)
     operation_mapping = Field(type=basestring)
     actor = Field()
-    inputs = Field(type=dict, default=lambda: {})
+    inputs = Field(type=dict, default=dict)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/aria/utils/plugin.py
----------------------------------------------------------------------
diff --git a/aria/utils/plugin.py b/aria/utils/plugin.py
index bb2b974..9868c89 100644
--- a/aria/utils/plugin.py
+++ b/aria/utils/plugin.py
@@ -19,8 +19,13 @@ Contains utility methods that enable dynamic python code loading
 """
 
 import os
+import tempfile
+import subprocess
+import sys
 from importlib import import_module
 
+import wagon
+
 
 def plugin_installer(path, plugin_suffix, package=None, callback=None):
     """
@@ -37,3 +42,19 @@ def plugin_installer(path, plugin_suffix, package=None, callback=None):
         module = import_module(module_name)
         if callback:
             callback(module)
+
+
+def create(source, destination_dir):
+    return wagon.create(source=source, archive_destination_dir=destination_dir)
+
+
+def install(source, prefix):
+    with tempfile.NamedTemporaryFile() as constraint:
+        constraint.write(subprocess.check_output([sys.executable, '-m', 'pip', 'freeze']))
+        constraint.flush()
+        wagon.install(
+            source=source,
+            install_args='--prefix="{prefix}" --constraint="{constraint}"'.format(
+                prefix=prefix,
+                constraint=constraint.name)
+        )

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/requirements.txt
----------------------------------------------------------------------
diff --git a/requirements.txt b/requirements.txt
index e6d5393..770d416 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -23,3 +23,4 @@ Jinja2==2.8
 shortuuid==0.4.3
 CacheControl[filecache]==0.11.6
 clint==0.5.1
+wagon==0.5.0

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/tests/orchestrator/context/test_toolbelt.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/context/test_toolbelt.py b/tests/orchestrator/context/test_toolbelt.py
index 547e62b..2a6c349 100644
--- a/tests/orchestrator/context/test_toolbelt.py
+++ b/tests/orchestrator/context/test_toolbelt.py
@@ -152,6 +152,7 @@ def test_wrong_model_toolbelt():
     with pytest.raises(RuntimeError):
         context.toolbelt(None)
 
+
 @operation(toolbelt=True)
 def host_ip(toolbelt, **_):
     global_test_holder['host_ip'] = toolbelt.host_ip

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/tests/orchestrator/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/core/test_engine.py b/tests/orchestrator/workflows/core/test_engine.py
index 1b00bf6..99b31c6 100644
--- a/tests/orchestrator/workflows/core/test_engine.py
+++ b/tests/orchestrator/workflows/core/test_engine.py
@@ -79,14 +79,14 @@ class BaseTest(object):
             ignore_failure=ignore_failure
         )
 
-    @pytest.fixture(scope='function', autouse=True)
+    @pytest.fixture(autouse=True)
     def globals_cleanup(self):
         try:
             yield
         finally:
             global_test_holder.clear()
 
-    @pytest.fixture(scope='function', autouse=True)
+    @pytest.fixture(autouse=True)
     def signals_registration(self, ):
         def sent_task_handler(*args, **kwargs):
             calls = global_test_holder.setdefault('sent_task_signal_calls', 0)
@@ -119,7 +119,7 @@ class BaseTest(object):
             events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
             events.sent_task_signal.disconnect(sent_task_handler)
 
-    @pytest.fixture(scope='function')
+    @pytest.fixture
     def executor(self):
         result = thread.ThreadExecutor()
         try:
@@ -127,7 +127,7 @@ class BaseTest(object):
         finally:
             result.close()
 
-    @pytest.fixture(scope='function')
+    @pytest.fixture
     def workflow_context(self):
         model_storage = aria.application_model_storage(tests.storage.InMemoryModelDriver())
         model_storage.setup()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/ad9cab78/tests/orchestrator/workflows/executor/test_executor.py
----------------------------------------------------------------------
diff --git a/tests/orchestrator/workflows/executor/test_executor.py b/tests/orchestrator/workflows/executor/test_executor.py
index a425799..fcfe9e8 100644
--- a/tests/orchestrator/workflows/executor/test_executor.py
+++ b/tests/orchestrator/workflows/executor/test_executor.py
@@ -25,7 +25,6 @@ from aria.orchestrator import events
 from aria.orchestrator.workflows.executor import (
     thread,
     multiprocess,
-    blocking,
     # celery
 )
 
@@ -45,7 +44,6 @@ class TestExecutor(object):
         (thread.ThreadExecutor, {'pool_size': 2}),
         (multiprocess.MultiprocessExecutor, {'pool_size': 1}),
         (multiprocess.MultiprocessExecutor, {'pool_size': 2}),
-        (blocking.CurrentThreadBlockingExecutor, {}),
         # (celery.CeleryExecutor, {'app': app})
     ])
     def test_execute(self, executor_cls, executor_kwargs):


Mime
View raw message