ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject incubator-ariatosca git commit: change, fix and add some
Date Thu, 10 Nov 2016 14:36:06 GMT
Repository: incubator-ariatosca
Updated Branches:
  refs/heads/ARIA-11-wf-cancel a4b7894a5 -> 391d6fc5e


change, fix and add some


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/391d6fc5
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/391d6fc5
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/391d6fc5

Branch: refs/heads/ARIA-11-wf-cancel
Commit: 391d6fc5e54ed9da6b44dd677cd230c42c373c89
Parents: a4b7894
Author: Dan Kilman <dank@gigaspaces.com>
Authored: Thu Nov 10 13:52:05 2016 +0200
Committer: Dan Kilman <dank@gigaspaces.com>
Committed: Thu Nov 10 15:38:04 2016 +0200

----------------------------------------------------------------------
 aria/context/workflow.py                     | 26 ++++++++++-------
 aria/events/__init__.py                      |  2 +-
 aria/events/builtin_event_handler.py         |  4 +--
 aria/events/workflow_engine_event_handler.py |  4 +--
 aria/storage/models.py                       |  2 +-
 aria/storage/structures.py                   |  4 +++
 aria/tools/validation.py                     | 11 -------
 aria/workflows/core/engine.py                |  9 ++----
 tests/context/test_workflow.py               |  3 ++
 tests/workflows/core/test_engine.py          | 35 ++++++++++++-----------
 10 files changed, 49 insertions(+), 51 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/context/workflow.py b/aria/context/workflow.py
index 9c5abdc..329757a 100644
--- a/aria/context/workflow.py
+++ b/aria/context/workflow.py
@@ -61,20 +61,12 @@ class WorkflowContext(logger.LoggerMixin):
         self.parameters = parameters or {}
         self.task_max_retries = task_max_retries
         self.task_retry_interval = task_retry_interval
-
+        # TODO: execution creation should happen somewhere else
+        # should be moved there, when such logical place exists
         try:
             self.model.execution.get(self.execution_id)
         except exceptions.StorageError:
-            execution_cls = self.model.execution.model_cls
-            execution = self.model.execution.model_cls(
-                id=self.execution_id,
-                deployment_id=self.deployment_id,
-                workflow_id=self.workflow_id,
-                blueprint_id=self.blueprint_id,
-                status=execution_cls.PENDING,
-                parameters=self.parameters,
-            )
-            self.model.execution.store(execution)
+            self._create_execution()
 
     def __repr__(self):
         return (
@@ -83,6 +75,18 @@ class WorkflowContext(logger.LoggerMixin):
             'execution_id={self.execution_id})'.format(
                 name=self.__class__.__name__, self=self))
 
+    def _create_execution(self):
+        execution_cls = self.model.execution.model_cls
+        execution = self.model.execution.model_cls(
+            id=self.execution_id,
+            deployment_id=self.deployment_id,
+            workflow_id=self.workflow_id,
+            blueprint_id=self.blueprint_id,
+            status=execution_cls.PENDING,
+            parameters=self.parameters,
+        )
+        self.model.execution.store(execution)
+
     @property
     def blueprint_id(self):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/events/__init__.py
----------------------------------------------------------------------
diff --git a/aria/events/__init__.py b/aria/events/__init__.py
index a158308..2e88733 100644
--- a/aria/events/__init__.py
+++ b/aria/events/__init__.py
@@ -47,7 +47,7 @@ on_failure_task_signal = signal('failure_task_signal')
 # workflow engine workflow signals:
 start_workflow_signal = signal('start_workflow_signal')
 on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
-on_cancel_workflow_signal = signal('on_cancel_workflow_signal')
+on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
 on_success_workflow_signal = signal('on_success_workflow_signal')
 on_failure_workflow_signal = signal('on_failure_workflow_signal')
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py
index 11ee744..3d73f2b 100644
--- a/aria/events/builtin_event_handler.py
+++ b/aria/events/builtin_event_handler.py
@@ -30,7 +30,7 @@ from . import (
     start_workflow_signal,
     on_success_workflow_signal,
     on_failure_workflow_signal,
-    on_cancel_workflow_signal,
+    on_cancelled_workflow_signal,
     on_cancelling_workflow_signal,
     sent_task_signal,
     start_task_signal,
@@ -96,7 +96,7 @@ def _workflow_succeeded(workflow_context, *args, **kwargs):
     workflow_context.execution = execution
 
 
-@on_cancel_workflow_signal.connect
+@on_cancelled_workflow_signal.connect
 def _workflow_cancelled(workflow_context, *args, **kwargs):
     execution = workflow_context.execution
     execution.status = execution.CANCELLED

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/events/workflow_engine_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/workflow_engine_event_handler.py b/aria/events/workflow_engine_event_handler.py
index 4067674..7df11d1 100644
--- a/aria/events/workflow_engine_event_handler.py
+++ b/aria/events/workflow_engine_event_handler.py
@@ -28,7 +28,7 @@ from . import (
     start_workflow_signal,
     on_success_workflow_signal,
     on_failure_workflow_signal,
-    on_cancel_workflow_signal,
+    on_cancelled_workflow_signal,
     on_cancelling_workflow_signal,
 )
 
@@ -64,7 +64,7 @@ def _success_workflow_handler(context, **kwargs):
     context.logger.debug('Event: Workflow success: {context.name}'.format(context=context))
 
 
-@on_cancel_workflow_signal.connect
+@on_cancelled_workflow_signal.connect
 def _cancel_workflow_handler(context, **kwargs):
     context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context))
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/storage/models.py
----------------------------------------------------------------------
diff --git a/aria/storage/models.py b/aria/storage/models.py
index eb943aa..a63f0e9 100644
--- a/aria/storage/models.py
+++ b/aria/storage/models.py
@@ -207,11 +207,11 @@ class Execution(Model):
     deployment_id = Field(type=basestring)
     workflow_id = Field(type=basestring)
     blueprint_id = Field(type=basestring)
+    created_at = Field(type=datetime, default=datetime.utcnow)
     started_at = Field(type=datetime, default=None)
     ended_at = Field(type=datetime, default=None)
     error = Field(type=basestring, default=None)
     parameters = Field()
-    is_system_workflow = Field(type=bool, default=False)
 
 
 class Relationship(Model):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/storage/structures.py
----------------------------------------------------------------------
diff --git a/aria/storage/structures.py b/aria/storage/structures.py
index c692d36..9a1524a 100644
--- a/aria/storage/structures.py
+++ b/aria/storage/structures.py
@@ -72,6 +72,7 @@ class Field(ValidatorMixin):
             self,
             type=None,
             choices=(),
+            validation_func=None,
             default=NO_DEFAULT,
             **kwargs):
         """
@@ -85,6 +86,7 @@ class Field(ValidatorMixin):
         self.type = type
         self.choices = choices
         self.default = default
+        self.validation_func = validation_func
         super(Field, self).__init__(**kwargs)
 
     def __get__(self, instance, owner):
@@ -120,6 +122,8 @@ class Field(ValidatorMixin):
             self.validate_instance(name, value, self.type)
         if self.choices:
             self.validate_in_choice(name, value, self.choices)
+        if self.validation_func:
+            pass
 
     def _field_name(self, instance):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/tools/validation.py
----------------------------------------------------------------------
diff --git a/aria/tools/validation.py b/aria/tools/validation.py
index ea1dae7..a33f7a2 100644
--- a/aria/tools/validation.py
+++ b/aria/tools/validation.py
@@ -24,20 +24,9 @@ class ValidatorMixin(object):
     """
 
     _ARGUMENT_TYPE_MESSAGE = '{name} argument must be {type} based, got {arg!r}'
-    _ACTION_MESSAGE = 'action arg options: {actions}, got {action}'
     _ARGUMENT_CHOICE_MESSAGE = '{name} argument must be in {choices}, got {arg!r}'
 
     @classmethod
-    def validate_actions(cls, action):
-        """
-        Validate action is defined in the class ``ACTIONS`` attribute
-        """
-        # todo: remove this and use validate choice
-        if action not in cls.ACTIONS:
-            raise TypeError(cls._ACTION_MESSAGE.format(
-                actions=cls.ACTIONS, action=action))
-
-    @classmethod
     def validate_in_choice(cls, name, argument, choices):
         """
         Validate ``argument`` is in ``choices``

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/aria/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/workflows/core/engine.py b/aria/workflows/core/engine.py
index c4a049c..26354eb 100644
--- a/aria/workflows/core/engine.py
+++ b/aria/workflows/core/engine.py
@@ -58,22 +58,19 @@ class Engine(logger.LoggerMixin):
                 else:
                     time.sleep(0.1)
             if self._is_cancelling():
-                events.on_cancel_workflow_signal.send(self._workflow_context)
+                events.on_cancelled_workflow_signal.send(self._workflow_context)
             else:
                 events.on_success_workflow_signal.send(self._workflow_context)
         except BaseException as e:
             events.on_failure_workflow_signal.send(self._workflow_context, exception=e)
             raise
 
-    def cancel_request(self):
+    def cancel_execution(self):
         """
         Send a cancel request to the engine. If execution already started, execution status
         will be modified to 'cancelling' status. If execution is in pending mode, execution
status
-        will be modified to 'cancelled' directly. If execution is in one if its ended state,
an
-        AriaEngineError will be raised
+        will be modified to 'cancelled' directly.
         """
-        if self._workflow_context.execution.status not in models.Execution.ACTIVE_STATES:
-            raise exceptions.AriaEngineError('Execution already ended')
         events.on_cancelling_workflow_signal.send(self._workflow_context)
 
     def _is_cancelling(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/tests/context/test_workflow.py
----------------------------------------------------------------------
diff --git a/tests/context/test_workflow.py b/tests/context/test_workflow.py
index 2e19aa2..b40a66d 100644
--- a/tests/context/test_workflow.py
+++ b/tests/context/test_workflow.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from datetime import datetime
+
 import pytest
 
 from aria import context, application_model_storage
@@ -32,6 +34,7 @@ class TestWorkflowContext(object):
         assert execution.blueprint_id == models.BLUEPRINT_ID
         assert execution.status == storage.execution.model_cls.PENDING
         assert execution.parameters == {}
+        assert execution.created_at <= datetime.utcnow()
 
     def test_subsequent_workflow_context_creation_do_not_fail(self, storage):
         self._create_ctx(storage)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/391d6fc5/tests/workflows/core/test_engine.py
----------------------------------------------------------------------
diff --git a/tests/workflows/core/test_engine.py b/tests/workflows/core/test_engine.py
index 748281e..3ddbaf0 100644
--- a/tests/workflows/core/test_engine.py
+++ b/tests/workflows/core/test_engine.py
@@ -94,7 +94,7 @@ class BaseTest(object):
         events.start_workflow_signal.connect(start_workflow_handler)
         events.on_success_workflow_signal.connect(success_workflow_handler)
         events.on_failure_workflow_signal.connect(failure_workflow_handler)
-        events.on_cancel_workflow_signal.connect(cancel_workflow_handler)
+        events.on_cancelled_workflow_signal.connect(cancel_workflow_handler)
         events.sent_task_signal.connect(sent_task_handler)
         try:
             yield
@@ -102,7 +102,7 @@ class BaseTest(object):
             events.start_workflow_signal.disconnect(start_workflow_handler)
             events.on_success_workflow_signal.disconnect(success_workflow_handler)
             events.on_failure_workflow_signal.disconnect(failure_workflow_handler)
-            events.on_cancel_workflow_signal.disconnect(cancel_workflow_handler)
+            events.on_cancelled_workflow_signal.disconnect(cancel_workflow_handler)
             events.sent_task_signal.disconnect(sent_task_handler)
 
     @pytest.fixture(scope='function')
@@ -152,6 +152,10 @@ class TestEngine(BaseTest):
         assert workflow_context.states == ['start', 'success']
         assert workflow_context.exception is None
         assert 'sent_task_signal_calls' not in global_test_holder
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is None
+        assert execution.status == models.Execution.TERMINATED
 
     def test_single_task_successful_execution(self, workflow_context, executor):
         @workflow
@@ -177,6 +181,10 @@ class TestEngine(BaseTest):
         assert workflow_context.states == ['start', 'failure']
         assert isinstance(workflow_context.exception, exceptions.ExecutorException)
         assert global_test_holder.get('sent_task_signal_calls') == 1
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is not None
+        assert execution.status == models.Execution.FAILED
 
     def test_two_tasks_execution_order(self, workflow_context, executor):
         @workflow
@@ -225,22 +233,16 @@ class TestEngine(BaseTest):
         t = threading.Thread(target=eng.execute)
         t.start()
         time.sleep(1)
-        eng.cancel_request()
+        eng.cancel_execution()
         t.join(timeout=30)
         assert workflow_context.states == ['start', 'cancel']
         assert workflow_context.exception is None
         invocations = global_test_holder.get('invocations', [])
         assert 0 < len(invocations) < number_of_tasks
-
-    def test_invalid_cancel_ended_execution(self, workflow_context, executor):
-        @workflow
-        def mock_workflow(**_):
-            pass
-        eng = self._execute(workflow_func=mock_workflow,
-                            workflow_context=workflow_context,
-                            executor=executor)
-        with pytest.raises(exceptions.AriaEngineError):
-            eng.cancel_request()
+        execution = workflow_context.execution
+        assert execution.started_at <= execution.ended_at <= datetime.utcnow()
+        assert execution.error is None
+        assert execution.status == models.Execution.CANCELLED
 
     def test_cancel_pending_execution(self, workflow_context, executor):
         @workflow
@@ -249,10 +251,9 @@ class TestEngine(BaseTest):
         eng = self._engine(workflow_func=mock_workflow,
                            workflow_context=workflow_context,
                            executor=executor)
-        eng.cancel_request()
-        # sanity to verify previous cancel request actually did something
-        with pytest.raises(exceptions.AriaEngineError):
-            eng.cancel_request()
+        eng.cancel_execution()
+        execution = workflow_context.execution
+        assert execution.status == models.Execution.CANCELLED
 
 
 class TestRetries(BaseTest):


Mime
View raw message