Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id D3671200C7E for ; Mon, 8 May 2017 20:35:17 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id D20A1160BBF; Mon, 8 May 2017 18:35:17 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C215F160BCA for ; Mon, 8 May 2017 20:35:16 +0200 (CEST) Received: (qmail 65174 invoked by uid 500); 8 May 2017 18:35:15 -0000 Mailing-List: contact dev-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list dev@ariatosca.incubator.apache.org Received: (qmail 65061 invoked by uid 99); 8 May 2017 18:35:15 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd1-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 May 2017 18:35:15 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd1-us-west.apache.org (ASF Mail Server at spamd1-us-west.apache.org) with ESMTP id 30251C0755 for ; Mon, 8 May 2017 18:35:15 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd1-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd1-us-west.apache.org [10.40.0.7]) (amavisd-new, port 10024) with ESMTP id T6FgNOP7YxTt for ; Mon, 8 May 2017 18:35:13 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D3E175FDFF for ; Mon, 8 May 2017 18:35:10 +0000 (UTC) Received: (qmail 64805 invoked by uid 99); 8 May 2017 18:35:09 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 08 May 2017 18:35:09 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E53F5F1717; Mon, 8 May 2017 18:35:07 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: emblemparade@apache.org To: dev@ariatosca.incubator.apache.org Date: Mon, 08 May 2017 18:35:20 -0000 Message-Id: In-Reply-To: References: X-Mailer: ASF-Git Admin Mailer Subject: [14/15] incubator-ariatosca git commit: ARIA-214 Dry execution changes the state of non implemented operations archived-at: Mon, 08 May 2017 18:35:18 -0000 ARIA-214 Dry execution changes the state of non implemented operations Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/0ec23707 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/0ec23707 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/0ec23707 Branch: refs/heads/ARIA-148-extra-cli-commands Commit: 0ec237071ebdeb28cd2feabbc1b51854543d398d Parents: 3e1ed14 Author: max-orlov Authored: Sun May 7 16:12:56 2017 +0300 Committer: max-orlov Committed: Sun May 7 22:29:53 2017 +0300 ---------------------------------------------------------------------- aria/orchestrator/workflows/core/task.py | 3 --- aria/orchestrator/workflows/core/translation.py | 6 +---- aria/orchestrator/workflows/executor/base.py | 19 ++++++++------ aria/orchestrator/workflows/executor/celery.py | 2 +- aria/orchestrator/workflows/executor/dry.py | 26 ++++++++++---------- aria/orchestrator/workflows/executor/process.py | 2 +- aria/orchestrator/workflows/executor/thread.py | 2 +- tests/orchestrator/workflows/core/test_task.py | 7 ++---- .../workflows/executor/test_process_executor.py | 18 +------------- 9 files changed, 31 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index 78159c4..b3dfb3c 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -163,9 +163,6 @@ class OperationTask(BaseTask): self._task_id = task_model.id self._update_fields = None - def execute(self): - super(OperationTask, self).execute() - @contextmanager def _update(self): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/core/translation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/translation.py b/aria/orchestrator/workflows/core/translation.py index 0bbce90..fec108b 100644 --- a/aria/orchestrator/workflows/core/translation.py +++ b/aria/orchestrator/workflows/core/translation.py @@ -48,11 +48,7 @@ def build_execution_graph( execution_graph, dependencies, default=[start_task]) if isinstance(api_task, api.task.OperationTask): - if api_task.implementation: - operation_task = core_task.OperationTask(api_task, executor=default_executor) - else: - operation_task = core_task.OperationTask(api_task, - executor=base.EmptyOperationExecutor()) + operation_task = core_task.OperationTask(api_task, executor=default_executor) _add_task_and_dependencies(execution_graph, operation_task, operation_dependencies) elif isinstance(api_task, api.task.WorkflowTask): # Build the graph recursively while adding start and end markers http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/base.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/base.py b/aria/orchestrator/workflows/executor/base.py index a225837..c543278 100644 --- a/aria/orchestrator/workflows/executor/base.py +++ b/aria/orchestrator/workflows/executor/base.py @@ -25,13 +25,22 @@ class BaseExecutor(logger.LoggerMixin): """ Base class for executors for running tasks """ + def _execute(self, task): + raise NotImplementedError def execute(self, task): """ Execute a task :param task: task to execute """ - raise NotImplementedError + if task.implementation: + self._execute(task) + else: + # In this case the task is missing an implementation. This task still gets to an + # executor, but since there is nothing to run, we by default simply skip the execution + # itself. + self._task_started(task) + self._task_succeeded(task) def close(self): """ @@ -52,12 +61,6 @@ class BaseExecutor(logger.LoggerMixin): events.on_success_task_signal.send(task) -class StubTaskExecutor(BaseExecutor): +class StubTaskExecutor(BaseExecutor): # pylint: disable=abstract-method def execute(self, task): task.status = task.SUCCESS - - -class EmptyOperationExecutor(BaseExecutor): - def execute(self, task): - events.start_task_signal.send(task) - events.on_success_task_signal.send(task) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/celery.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py index 7bd9b7c..bbddc25 100644 --- a/aria/orchestrator/workflows/executor/celery.py +++ b/aria/orchestrator/workflows/executor/celery.py @@ -42,7 +42,7 @@ class CeleryExecutor(BaseExecutor): self._receiver_thread.start() self._started_queue.get(timeout=30) - def execute(self, task): + def _execute(self, task): self._tasks[task.id] = task inputs = dict(inp.unwrap() for inp in task.inputs.values()) inputs['ctx'] = task.context http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/dry.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py index eb70a41..f6fb7a6 100644 --- a/aria/orchestrator/workflows/executor/dry.py +++ b/aria/orchestrator/workflows/executor/dry.py @@ -21,11 +21,10 @@ from datetime import datetime from .base import BaseExecutor -class DryExecutor(BaseExecutor): +class DryExecutor(BaseExecutor): # pylint: disable=abstract-method """ Executor which dry runs tasks - prints task information without causing any side effects """ - def execute(self, task): # updating the task manually instead of calling self._task_started(task), # to avoid any side effects raising that event might cause @@ -33,19 +32,20 @@ class DryExecutor(BaseExecutor): task.started_at = datetime.utcnow() task.status = task.STARTED - if hasattr(task.actor, 'source_node'): - name = '{source_node.name}->{target_node.name}'.format( - source_node=task.actor.source_node, target_node=task.actor.target_node) - else: - name = task.actor.name + if task.implementation: + if hasattr(task.actor, 'source_node'): + name = '{source_node.name}->{target_node.name}'.format( + source_node=task.actor.source_node, target_node=task.actor.target_node) + else: + name = task.actor.name - task.context.logger.info( - ' {name} {task.interface_name}.{task.operation_name} started...' - .format(name=name, task=task)) + task.context.logger.info( + ' {name} {task.interface_name}.{task.operation_name} started...' + .format(name=name, task=task)) - task.context.logger.info( - ' {name} {task.interface_name}.{task.operation_name} successful' - .format(name=name, task=task)) + task.context.logger.info( + ' {name} {task.interface_name}.{task.operation_name} successful' + .format(name=name, task=task)) # updating the task manually instead of calling self._task_succeeded(task), # to avoid any side effects raising that event might cause http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index f3daf04..e464f7d 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -116,7 +116,7 @@ class ProcessExecutor(base.BaseExecutor): self._server_socket.close() self._listener_thread.join(timeout=60) - def execute(self, task): + def _execute(self, task): self._check_closed() self._tasks[task.id] = task http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 836b2bf..f53362a 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -46,7 +46,7 @@ class ThreadExecutor(BaseExecutor): thread.start() self._pool.append(thread) - def execute(self, task): + def _execute(self, task): self._queue.put(task) def close(self): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/tests/orchestrator/workflows/core/test_task.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/core/test_task.py b/tests/orchestrator/workflows/core/test_task.py index 748ee20..50ca7f5 100644 --- a/tests/orchestrator/workflows/core/test_task.py +++ b/tests/orchestrator/workflows/core/test_task.py @@ -24,7 +24,6 @@ from aria.orchestrator.workflows import ( api, core, exceptions, - executor ) from tests import mock, storage @@ -71,8 +70,7 @@ class TestOperationTask(object): node, interface_name=NODE_INTERFACE_NAME, operation_name=NODE_OPERATION_NAME) - core_task = core.task.OperationTask(api_task=api_task, - executor=executor.base.EmptyOperationExecutor()) + core_task = core.task.OperationTask(api_task=api_task, executor=None) return api_task, core_task def _create_relationship_operation_task(self, ctx, relationship): @@ -81,8 +79,7 @@ class TestOperationTask(object): relationship, interface_name=RELATIONSHIP_INTERFACE_NAME, operation_name=RELATIONSHIP_OPERATION_NAME) - core_task = core.task.OperationTask(api_task=api_task, - executor=executor.base.EmptyOperationExecutor()) + core_task = core.task.OperationTask(api_task=api_task, executor=None) return api_task, core_task def test_node_operation_task_creation(self, ctx): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/0ec23707/tests/orchestrator/workflows/executor/test_process_executor.py ---------------------------------------------------------------------- diff --git a/tests/orchestrator/workflows/executor/test_process_executor.py b/tests/orchestrator/workflows/executor/test_process_executor.py index b353518..5f240b2 100644 --- a/tests/orchestrator/workflows/executor/test_process_executor.py +++ b/tests/orchestrator/workflows/executor/test_process_executor.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import logging import os import Queue @@ -66,7 +65,7 @@ class TestProcessExecutor(object): def test_closed(self, executor): executor.close() with pytest.raises(RuntimeError) as exc_info: - executor.execute(task=None) + executor.execute(task=MockTask(implementation='some.implementation')) assert 'closed' in exc_info.value.message @@ -82,18 +81,3 @@ def mock_plugin(plugin_manager, tmpdir): source = os.path.join(tests.resources.DIR, 'plugins', 'mock-plugin1') plugin_path = create_plugin(source=source, destination_dir=str(tmpdir)) return plugin_manager.install(source=plugin_path) - - -class MockContext(object): - - def __init__(self, *args, **kwargs): - self.logger = logging.getLogger('mock_logger') - self.task = type('SubprocessMockTask', (object, ), {'plugin': None}) - self.serialization_dict = {'context_cls': self.__class__, 'context': {}} - - def __getattr__(self, item): - return None - - @classmethod - def deserialize_from_dict(cls, **kwargs): - return cls()