ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [4/4] incubator-ariatosca git commit: repository_ordering
Date Wed, 16 Nov 2016 09:16:23 GMT
repository_ordering


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/675b820f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/675b820f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/675b820f

Branch: refs/heads/ARIA-21-reorder-repository-sturcutre
Commit: 675b820f0ebec985532580633fcd84bf74ff4180
Parents: 8ee1470
Author: mxmrlv <mxmrlv@gmail.com>
Authored: Tue Nov 15 19:55:00 2016 +0200
Committer: mxmrlv <mxmrlv@gmail.com>
Committed: Wed Nov 16 11:16:03 2016 +0200

----------------------------------------------------------------------
 aria/__init__.py                                |  17 +-
 aria/cli/commands.py                            |  22 +-
 aria/context/__init__.py                        |  21 --
 aria/context/common.py                          | 147 ----------
 aria/context/operation.py                       | 127 --------
 aria/context/toolbelt.py                        |  75 -----
 aria/context/workflow.py                        | 119 --------
 aria/decorators.py                              |  73 -----
 aria/events/__init__.py                         |  57 ----
 aria/events/builtin_event_handler.py            | 123 --------
 aria/events/workflow_engine_event_handler.py    |  74 -----
 aria/exceptions.py                              |   7 -
 aria/orchestrator/__init__.py                   |  23 ++
 aria/orchestrator/context/__init__.py           |  21 ++
 aria/orchestrator/context/common.py             | 144 +++++++++
 aria/orchestrator/context/exceptions.py         |  23 ++
 aria/orchestrator/context/operation.py          | 127 ++++++++
 aria/orchestrator/context/toolbelt.py           |  75 +++++
 aria/orchestrator/context/workflow.py           | 113 ++++++++
 aria/orchestrator/decorators.py                 |  74 +++++
 aria/orchestrator/events/__init__.py            |  57 ++++
 .../events/builtin_event_handler.py             | 123 ++++++++
 .../events/workflow_engine_event_handler.py     |  74 +++++
 aria/orchestrator/exceptions.py                 |  20 ++
 aria/orchestrator/workflows/__init__.py         |  14 +
 aria/orchestrator/workflows/api/__init__.py     |  20 ++
 aria/orchestrator/workflows/api/task.py         | 171 +++++++++++
 aria/orchestrator/workflows/api/task_graph.py   | 290 +++++++++++++++++++
 aria/orchestrator/workflows/builtin/__init__.py |  31 ++
 .../workflows/builtin/execute_operation.py      | 104 +++++++
 aria/orchestrator/workflows/builtin/heal.py     | 174 +++++++++++
 aria/orchestrator/workflows/builtin/install.py  |  53 ++++
 .../orchestrator/workflows/builtin/uninstall.py |  52 ++++
 .../orchestrator/workflows/builtin/workflows.py | 215 ++++++++++++++
 aria/orchestrator/workflows/core/__init__.py    |  20 ++
 aria/orchestrator/workflows/core/engine.py      | 116 ++++++++
 aria/orchestrator/workflows/core/task.py        | 243 ++++++++++++++++
 aria/orchestrator/workflows/core/translation.py | 106 +++++++
 aria/orchestrator/workflows/exceptions.py       |  71 +++++
 .../orchestrator/workflows/executor/__init__.py |  21 ++
 aria/orchestrator/workflows/executor/base.py    |  54 ++++
 .../orchestrator/workflows/executor/blocking.py |  36 +++
 aria/orchestrator/workflows/executor/celery.py  |  97 +++++++
 .../workflows/executor/multiprocess.py          |  98 +++++++
 aria/orchestrator/workflows/executor/thread.py  |  65 +++++
 aria/storage/__init__.py                        |   2 +-
 aria/storage/drivers.py                         |   6 +-
 aria/storage/exceptions.py                      |  23 ++
 aria/storage/structures.py                      |   5 +-
 aria/tools/__init__.py                          |   6 +
 aria/tools/application.py                       |   4 +-
 aria/tools/process.py                           |   2 +-
 aria/workflows/__init__.py                      |  14 -
 aria/workflows/api/__init__.py                  |  20 --
 aria/workflows/api/task.py                      | 172 -----------
 aria/workflows/api/task_graph.py                | 290 -------------------
 aria/workflows/builtin/__init__.py              |  31 --
 aria/workflows/builtin/execute_operation.py     | 104 -------
 aria/workflows/builtin/heal.py                  | 174 -----------
 aria/workflows/builtin/install.py               |  53 ----
 aria/workflows/builtin/uninstall.py             |  52 ----
 aria/workflows/builtin/workflows.py             | 215 --------------
 aria/workflows/core/__init__.py                 |  20 --
 aria/workflows/core/engine.py                   | 114 --------
 aria/workflows/core/task.py                     | 242 ----------------
 aria/workflows/core/translation.py              | 106 -------
 aria/workflows/exceptions.py                    |  70 -----
 aria/workflows/executor/__init__.py             |  21 --
 aria/workflows/executor/base.py                 |  54 ----
 aria/workflows/executor/blocking.py             |  36 ---
 aria/workflows/executor/celery.py               |  97 -------
 aria/workflows/executor/multiprocess.py         |  98 -------
 aria/workflows/executor/thread.py               |  65 -----
 tests/context/__init__.py                       |   4 +-
 tests/context/test_operation.py                 |   6 +-
 tests/context/test_toolbelt.py                  |   9 +-
 tests/context/test_workflow.py                  |   3 +-
 tests/mock/context.py                           |   3 +-
 tests/storage/test_drivers.py                   |   2 +-
 tests/storage/test_model_storage.py             |   2 +-
 tests/storage/test_models.py                    |   2 +-
 tests/storage/test_models_api.py                |   2 +-
 tests/storage/test_resource_storage.py          |   2 +-
 tests/test_logger.py                            |   3 +-
 tests/workflows/api/test_task.py                |   4 +-
 tests/workflows/api/test_task_graph.py          |   2 +-
 .../workflows/builtin/test_execute_operation.py |   4 +-
 tests/workflows/builtin/test_heal.py            |   4 +-
 tests/workflows/builtin/test_install.py         |   4 +-
 tests/workflows/builtin/test_uninstall.py       |   4 +-
 tests/workflows/core/test_engine.py             |   8 +-
 tests/workflows/core/test_task.py               |   4 +-
 .../test_task_graph_into_exececution_graph.py   |   4 +-
 tests/workflows/executor/test_executor.py       |   4 +-
 94 files changed, 3022 insertions(+), 2941 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/__init__.py
----------------------------------------------------------------------
diff --git a/aria/__init__.py b/aria/__init__.py
index eca7b9b..b2d1157 100644
--- a/aria/__init__.py
+++ b/aria/__init__.py
@@ -18,15 +18,16 @@ Aria top level package
 """
 
 from .VERSION import version as __version__
-from .storage.drivers import (
-    ResourceDriver,
-    ModelDriver,
-    FileSystemModelDriver,
-    FileSystemResourceDriver,
-)
-from .storage import ModelStorage, ResourceStorage, models
-from .decorators import workflow, operation
 
+from .orchestrator.decorators import workflow, operation
+from .storage import ModelStorage, ResourceStorage, models, ModelDriver, ResourceDriver
+from . import (
+    tools,
+    parser,
+    storage,
+    orchestrator,
+    cli
+)
 __all__ = (
     '__version__',
     'workflow',

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/cli/commands.py
----------------------------------------------------------------------
diff --git a/aria/cli/commands.py b/aria/cli/commands.py
index d3698fd..2fff4f0 100644
--- a/aria/cli/commands.py
+++ b/aria/cli/commands.py
@@ -23,16 +23,22 @@ import sys
 from glob import glob
 from importlib import import_module
 
+from dsl_parser.parser import parse_from_path
+from dsl_parser.tasks import prepare_deployment_plan
 from yaml import safe_load, YAMLError
 
 from aria import application_model_storage, application_resource_storage
+from aria.orchestrator.context.workflow import WorkflowContext
 from aria.logger import LoggerMixin
 from aria.storage import FileSystemModelDriver, FileSystemResourceDriver
 from aria.tools.application import StorageManager
-from aria.context.workflow import WorkflowContext
-from aria.workflows.core.engine import Engine
-from aria.workflows.executor.thread import ThreadExecutor
-
+from aria.orchestrator.workflows.core.engine import Engine
+from aria.orchestrator.workflows.executor.thread import ThreadExecutor
+from .exceptions import (
+    AriaCliFormatInputsError,
+    AriaCliYAMLInputsError,
+    AriaCliInvalidInputsError
+)
 from .storage import (
     local_resource_storage,
     create_local_storage,
@@ -42,16 +48,8 @@ from .storage import (
     local_storage,
 )
 
-from .exceptions import (
-    AriaCliFormatInputsError,
-    AriaCliYAMLInputsError,
-    AriaCliInvalidInputsError
-)
 
 #######################################
-from dsl_parser.parser import parse_from_path
-from dsl_parser.tasks import prepare_deployment_plan
-#######################################
 
 
 class BaseCommand(LoggerMixin):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/context/__init__.py
----------------------------------------------------------------------
diff --git a/aria/context/__init__.py b/aria/context/__init__.py
deleted file mode 100644
index ad89b13..0000000
--- a/aria/context/__init__.py
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Provides contexts to workflow and operation
-"""
-
-from . import workflow, operation
-from .toolbelt import toolbelt

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/context/common.py
----------------------------------------------------------------------
diff --git a/aria/context/common.py b/aria/context/common.py
deleted file mode 100644
index 6e9b86a..0000000
--- a/aria/context/common.py
+++ /dev/null
@@ -1,147 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-A common context for both workflow and operation
-"""
-from uuid import uuid4
-
-from .. import (
-    logger,
-    exceptions,
-)
-from ..tools.lru_cache import lru_cache
-
-
-class BaseContext(logger.LoggerMixin):
-    """
-    Base context object for workflow and operation
-    """
-
-    def __init__(
-            self,
-            name,
-            model_storage,
-            resource_storage,
-            deployment_id,
-            workflow_id,
-            execution_id=None,
-            task_max_attempts=1,
-            task_retry_interval=0,
-            task_ignore_failure=False,
-            **kwargs):
-        super(BaseContext, self).__init__(**kwargs)
-        self._name = name
-        self._id = str(uuid4())
-        self._model = model_storage
-        self._resource = resource_storage
-        self._deployment_id = deployment_id
-        self._workflow_id = workflow_id
-        self._execution_id = execution_id or str(uuid4())
-        self._task_max_attempts = task_max_attempts
-        self._task_retry_interval = task_retry_interval
-        self._task_ignore_failure = task_ignore_failure
-
-    def __repr__(self):
-        return (
-            '{name}(name={self.name}, '
-            'deployment_id={self._deployment_id}, '
-            'workflow_id={self._workflow_id}, '
-            'execution_id={self._execution_id})'
-            .format(name=self.__class__.__name__, self=self))
-
-    @property
-    def model(self):
-        """
-        Access to the model storage
-        :return:
-        """
-        return self._model
-
-    @property
-    def resource(self):
-        """
-        Access to the resource storage
-        :return:
-        """
-        return self._resource
-
-    @property
-    @lru_cache()
-    def blueprint(self):
-        """
-        The blueprint model
-        """
-        return self.model.blueprint.get(self.deployment.blueprint_id)
-
-    @property
-    @lru_cache()
-    def deployment(self):
-        """
-        The deployment model
-        """
-        return self.model.deployment.get(self._deployment_id)
-
-    @property
-    def execution(self):
-        """
-        The execution model
-        """
-        return self.model.execution.get(self._execution_id)
-
-    @execution.setter
-    def execution(self, value):
-        """
-        Store the execution in the model storage
-        """
-        self.model.execution.store(value)
-
-    @property
-    def name(self):
-        """
-        The operation name
-        :return:
-        """
-        return self._name
-
-    @property
-    def id(self):
-        """
-        The operation id
-        :return:
-        """
-        return self._id
-
-    def download_resource(self, destination, path=None):
-        """
-        Download a blueprint resource from the resource storage
-        """
-        try:
-            return self.resource.deployment.download(entry_id=self.deployment.id,
-                                                     destination=destination,
-                                                     path=path)
-        except exceptions.StorageError:
-            return self.resource.blueprint.download(entry_id=self.blueprint.id,
-                                                    destination=destination,
-                                                    path=path)
-
-    @lru_cache()
-    def get_resource(self, path=None):
-        """
-        Read a deployment resource as string from the resource storage
-        """
-        try:
-            return self.resource.deployment.data(entry_id=self.deployment.id, path=path)
-        except exceptions.StorageError:
-            return self.resource.blueprint.data(entry_id=self.blueprint.id, path=path)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/context/operation.py b/aria/context/operation.py
deleted file mode 100644
index bf3686d..0000000
--- a/aria/context/operation.py
+++ /dev/null
@@ -1,127 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow and operation contexts
-"""
-
-
-from .common import BaseContext
-
-
-class BaseOperationContext(BaseContext):
-    """
-    Context object used during operation creation and execution
-    """
-
-    def __init__(self, name, workflow_context, task, **kwargs):
-        super(BaseOperationContext, self).__init__(
-            name=name,
-            model_storage=workflow_context.model,
-            resource_storage=workflow_context.resource,
-            deployment_id=workflow_context._deployment_id,
-            workflow_id=workflow_context._workflow_id,
-            execution_id=workflow_context._execution_id,
-            **kwargs)
-        self._task_model = task
-        self._actor = self.task.actor
-
-    def __repr__(self):
-        details = 'operation_mapping={task.operation_mapping}; ' \
-                  'operation_inputs={task.inputs}'\
-            .format(task=self.task)
-        return '{name}({0})'.format(details, name=self.name)
-
-    @property
-    def task(self):
-        """
-        The task in the model storage
-        :return: Task model
-        """
-        return self._task_model
-
-
-class NodeOperationContext(BaseOperationContext):
-    """
-    Context for node based operations.
-    """
-    @property
-    def node(self):
-        """
-        the node of the current operation
-        :return:
-        """
-        return self._actor.node
-
-    @property
-    def node_instance(self):
-        """
-        The node instance of the current operation
-        :return:
-        """
-        return self._actor
-
-
-class RelationshipOperationContext(BaseOperationContext):
-    """
-    Context for relationship based operations.
-    """
-    @property
-    def source_node(self):
-        """
-        The source node
-        :return:
-        """
-        return self.model.node.get(self.relationship.source_id)
-
-    @property
-    def source_node_instance(self):
-        """
-        The source node instance
-        :return:
-        """
-        return self.model.node_instance.get(self.relationship_instance.source_id)
-
-    @property
-    def target_node(self):
-        """
-        The target node
-        :return:
-        """
-        return self.model.node.get(self.relationship.target_id)
-
-    @property
-    def target_node_instance(self):
-        """
-        The target node instance
-        :return:
-        """
-        return self.model.node_instance.get(self._actor.target_id)
-
-    @property
-    def relationship(self):
-        """
-        The relationship of the current operation
-        :return:
-        """
-        return self._actor.relationship
-
-    @property
-    def relationship_instance(self):
-        """
-        The relationship instance of the current operation
-        :return:
-        """
-        return self._actor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/context/toolbelt.py
----------------------------------------------------------------------
diff --git a/aria/context/toolbelt.py b/aria/context/toolbelt.py
deleted file mode 100644
index 0aad89c..0000000
--- a/aria/context/toolbelt.py
+++ /dev/null
@@ -1,75 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-"""
-Provides with different tools for operations.
-"""
-
-from . import operation
-
-
-class NodeToolBelt(object):
-    """
-    Node operation related tool belt
-    """
-    def __init__(self, operation_context):
-        self._op_context = operation_context
-
-    @property
-    def dependent_node_instances(self):
-        """
-        Any node instance which has a relationship to the current node instance.
-        :return:
-        """
-        assert isinstance(self._op_context, operation.NodeOperationContext)
-        node_instances = self._op_context.model.node_instance.iter(
-            filters={'deployment_id': self._op_context.deployment.id}
-        )
-        for node_instance in node_instances:
-            for relationship_instance in node_instance.relationship_instances:
-                if relationship_instance.target_id == self._op_context.node_instance.id:
-                    yield node_instance
-
-    @property
-    def host_ip(self):
-        """
-        The host ip of the current node
-        :return:
-        """
-        assert isinstance(self._op_context, operation.NodeOperationContext)
-        host_id = self._op_context._actor.host_id
-        host_instance = self._op_context.model.node_instance.get(host_id)
-        return host_instance.runtime_properties.get('ip')
-
-
-class RelationshipToolBelt(object):
-    """
-    Relationship operation related tool belt
-    """
-    def __init__(self, operation_context):
-        self._op_context = operation_context
-
-
-def toolbelt(operation_context):
-    """
-    Get a toolbelt according to the current operation executor
-    :param operation_context:
-    :return:
-    """
-    if isinstance(operation_context, operation.NodeOperationContext):
-        return NodeToolBelt(operation_context)
-    elif isinstance(operation_context, operation.RelationshipOperationContext):
-        return RelationshipToolBelt(operation_context)
-    else:
-        raise RuntimeError("Operation context not supported")

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/context/workflow.py b/aria/context/workflow.py
deleted file mode 100644
index 0495bdc..0000000
--- a/aria/context/workflow.py
+++ /dev/null
@@ -1,119 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow and operation contexts
-"""
-
-import threading
-from contextlib import contextmanager
-
-from aria import exceptions
-
-from .common import BaseContext
-
-
-class ContextException(exceptions.AriaError):
-    """
-    Context based exception
-    """
-    pass
-
-
-class WorkflowContext(BaseContext):
-    """
-    Context object used during workflow creation and execution
-    """
-    def __init__(self, parameters=None, *args, **kwargs):
-        super(WorkflowContext, self).__init__(*args, **kwargs)
-        self.parameters = parameters or {}
-        # TODO: execution creation should happen somewhere else
-        # should be moved there, when such logical place exists
-        try:
-            self.model.execution.get(self._execution_id)
-        except exceptions.StorageError:
-            self._create_execution()
-
-    def __repr__(self):
-        return (
-            '{name}(deployment_id={self._deployment_id}, '
-            'workflow_id={self._workflow_id}, '
-            'execution_id={self._execution_id})'.format(
-                name=self.__class__.__name__, self=self))
-
-    def _create_execution(self):
-        execution_cls = self.model.execution.model_cls
-        execution = self.model.execution.model_cls(
-            id=self._execution_id,
-            deployment_id=self.deployment.id,
-            workflow_id=self._workflow_id,
-            blueprint_id=self.blueprint.id,
-            status=execution_cls.PENDING,
-            parameters=self.parameters,
-        )
-        self.model.execution.store(execution)
-
-    @property
-    def nodes(self):
-        """
-        Iterator over nodes
-        """
-        return self.model.node.iter(filters={'blueprint_id': self.blueprint.id})
-
-    @property
-    def node_instances(self):
-        """
-        Iterator over node instances
-        """
-        return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id})
-
-
-class _CurrentContext(threading.local):
-    """
-    Provides thread-level context, which sugarcoats the task api.
-    """
-
-    def __init__(self):
-        super(_CurrentContext, self).__init__()
-        self._workflow_context = None
-
-    def _set(self, value):
-        self._workflow_context = value
-
-    def get(self):
-        """
-        Retrieves the current workflow context
-        :return: the workflow context
-        :rtype: WorkflowContext
-        """
-        if self._workflow_context is not None:
-            return self._workflow_context
-        raise ContextException("No context was set")
-
-    @contextmanager
-    def push(self, workflow_context):
-        """
-        Switches the current context to the provided context
-        :param workflow_context: the context to switch to.
-        :yields: the current context
-        """
-        prev_workflow_context = self._workflow_context
-        self._set(workflow_context)
-        try:
-            yield self
-        finally:
-            self._set(prev_workflow_context)
-
-current = _CurrentContext()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/decorators.py
----------------------------------------------------------------------
diff --git a/aria/decorators.py b/aria/decorators.py
deleted file mode 100644
index 8bde0ef..0000000
--- a/aria/decorators.py
+++ /dev/null
@@ -1,73 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow and operation decorators
-"""
-
-from uuid import uuid4
-from functools import partial, wraps
-
-from . import context
-from .workflows.api import task_graph
-from .tools.validation import validate_function_arguments
-
-
-def workflow(func=None, suffix_template=''):
-    """
-    Workflow decorator
-    """
-    if func is None:
-        return partial(workflow, suffix_template=suffix_template)
-
-    @wraps(func)
-    def _wrapper(ctx, **workflow_parameters):
-
-        workflow_name = _generate_name(
-            func_name=func.__name__,
-            suffix_template=suffix_template,
-            ctx=ctx,
-            **workflow_parameters)
-
-        workflow_parameters.setdefault('ctx', ctx)
-        workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name))
-        validate_function_arguments(func, workflow_parameters)
-        with context.workflow.current.push(ctx):
-            func(**workflow_parameters)
-        return workflow_parameters['graph']
-    return _wrapper
-
-
-def operation(func=None, toolbelt=False, suffix_template=''):
-    """
-    Operation decorator
-    """
-    if func is None:
-        return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt)
-
-    @wraps(func)
-    def _wrapper(**func_kwargs):
-        if toolbelt:
-            operation_toolbelt = context.toolbelt(func_kwargs['ctx'])
-            func_kwargs.setdefault('toolbelt', operation_toolbelt)
-        validate_function_arguments(func, func_kwargs)
-        return func(**func_kwargs)
-    return _wrapper
-
-
-def _generate_name(func_name, ctx, suffix_template, **custom_kwargs):
-    return '{func_name}.{suffix}'.format(
-        func_name=func_name,
-        suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4()))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/events/__init__.py
----------------------------------------------------------------------
diff --git a/aria/events/__init__.py b/aria/events/__init__.py
deleted file mode 100644
index 2e88733..0000000
--- a/aria/events/__init__.py
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-ARIA's events Sub-Package
-Path: aria.events
-
-Events package provides events mechanism for different executions in aria.
-
-
-1. storage_event_handler: implementation of storage handlers for workflow and operation events.
-2. logger_event_handler: implementation of logger handlers for workflow and operation events.
-
-API:
-    * start_task_signal
-    * on_success_task_signal
-    * on_failure_task_signal
-    * start_workflow_signal
-    * on_success_workflow_signal
-    * on_failure_workflow_signal
-"""
-
-import os
-
-from blinker import signal
-
-from ..tools.plugin import plugin_installer
-
-# workflow engine task signals:
-sent_task_signal = signal('sent_task_signal')
-start_task_signal = signal('start_task_signal')
-on_success_task_signal = signal('success_task_signal')
-on_failure_task_signal = signal('failure_task_signal')
-
-# workflow engine workflow signals:
-start_workflow_signal = signal('start_workflow_signal')
-on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
-on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
-on_success_workflow_signal = signal('on_success_workflow_signal')
-on_failure_workflow_signal = signal('on_failure_workflow_signal')
-
-plugin_installer(
-    path=os.path.dirname(os.path.realpath(__file__)),
-    plugin_suffix='_event_handler',
-    package=__package__)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/builtin_event_handler.py b/aria/events/builtin_event_handler.py
deleted file mode 100644
index c5cccfe..0000000
--- a/aria/events/builtin_event_handler.py
+++ /dev/null
@@ -1,123 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Aria's events Sub-Package
-Path: aria.events.storage_event_handler
-
-Implementation of storage handlers for workflow and operation events.
-"""
-
-
-from datetime import (
-    datetime,
-    timedelta,
-)
-
-from . import (
-    start_workflow_signal,
-    on_success_workflow_signal,
-    on_failure_workflow_signal,
-    on_cancelled_workflow_signal,
-    on_cancelling_workflow_signal,
-    sent_task_signal,
-    start_task_signal,
-    on_success_task_signal,
-    on_failure_task_signal,
-)
-
-
-@sent_task_signal.connect
-def _task_sent(task, *args, **kwargs):
-    with task._update():
-        task.status = task.SENT
-
-
-@start_task_signal.connect
-def _task_started(task, *args, **kwargs):
-    with task._update():
-        task.started_at = datetime.utcnow()
-        task.status = task.STARTED
-
-
-@on_failure_task_signal.connect
-def _task_failed(task, *args, **kwargs):
-    with task._update():
-        should_retry = (
-            (task.retry_count < task.max_attempts - 1 or
-             task.max_attempts == task.INFINITE_RETRIES) and
-            # ignore_failure check here means the task will not be retries and it will be marked as
-            # failed. The engine will also look at ignore_failure so it won't fail the workflow.
-            not task.ignore_failure)
-        if should_retry:
-            task.status = task.RETRYING
-            task.retry_count += 1
-            task.due_at = datetime.utcnow() + timedelta(seconds=task.retry_interval)
-        else:
-            task.ended_at = datetime.utcnow()
-            task.status = task.FAILED
-
-
-@on_success_task_signal.connect
-def _task_succeeded(task, *args, **kwargs):
-    with task._update():
-        task.ended_at = datetime.utcnow()
-        task.status = task.SUCCESS
-
-
-@start_workflow_signal.connect
-def _workflow_started(workflow_context, *args, **kwargs):
-    execution = workflow_context.execution
-    execution.status = execution.STARTED
-    execution.started_at = datetime.utcnow()
-    workflow_context.execution = execution
-
-
-@on_failure_workflow_signal.connect
-def _workflow_failed(workflow_context, exception, *args, **kwargs):
-    execution = workflow_context.execution
-    execution.error = str(exception)
-    execution.status = execution.FAILED
-    execution.ended_at = datetime.utcnow()
-    workflow_context.execution = execution
-
-
-@on_success_workflow_signal.connect
-def _workflow_succeeded(workflow_context, *args, **kwargs):
-    execution = workflow_context.execution
-    execution.status = execution.TERMINATED
-    execution.ended_at = datetime.utcnow()
-    workflow_context.execution = execution
-
-
-@on_cancelled_workflow_signal.connect
-def _workflow_cancelled(workflow_context, *args, **kwargs):
-    execution = workflow_context.execution
-    # _workflow_cancelling function may have called this function
-    # already
-    if execution.status == execution.CANCELLED:
-        return
-    execution.status = execution.CANCELLED
-    execution.ended_at = datetime.utcnow()
-    workflow_context.execution = execution
-
-
-@on_cancelling_workflow_signal.connect
-def _workflow_cancelling(workflow_context, *args, **kwargs):
-    execution = workflow_context.execution
-    if execution.status == execution.PENDING:
-        return _workflow_cancelled(workflow_context=workflow_context)
-    execution.status = execution.CANCELLING
-    workflow_context.execution = execution

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/events/workflow_engine_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/events/workflow_engine_event_handler.py b/aria/events/workflow_engine_event_handler.py
deleted file mode 100644
index 7df11d1..0000000
--- a/aria/events/workflow_engine_event_handler.py
+++ /dev/null
@@ -1,74 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-
-"""
-Aria's events Sub-Package
-Path: aria.events.storage_event_handler
-
-Implementation of logger handlers for workflow and operation events.
-"""
-
-from . import (
-    start_task_signal,
-    on_success_task_signal,
-    on_failure_task_signal,
-    start_workflow_signal,
-    on_success_workflow_signal,
-    on_failure_workflow_signal,
-    on_cancelled_workflow_signal,
-    on_cancelling_workflow_signal,
-)
-
-
-@start_task_signal.connect
-def _start_task_handler(task, **kwargs):
-    task.logger.debug('Event: Starting task: {task.name}'.format(task=task))
-
-
-@on_success_task_signal.connect
-def _success_task_handler(task, **kwargs):
-    task.logger.debug('Event: Task success: {task.name}'.format(task=task))
-
-
-@on_failure_task_signal.connect
-def _failure_operation_handler(task, **kwargs):
-    task.logger.error('Event: Task failure: {task.name}'.format(task=task),
-                      exc_info=kwargs.get('exception', True))
-
-
-@start_workflow_signal.connect
-def _start_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context))
-
-
-@on_failure_workflow_signal.connect
-def _failure_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context))
-
-
-@on_success_workflow_signal.connect
-def _success_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow success: {context.name}'.format(context=context))
-
-
-@on_cancelled_workflow_signal.connect
-def _cancel_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context))
-
-
-@on_cancelling_workflow_signal.connect
-def _cancelling_workflow_handler(context, **kwargs):
-    context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/exceptions.py b/aria/exceptions.py
index d68b5a2..1e99c4e 100644
--- a/aria/exceptions.py
+++ b/aria/exceptions.py
@@ -19,8 +19,6 @@ Every sub-package in Aria has a module with its exceptions.
 aria.exceptions module conveniently collects all these exceptions for easier imports.
 """
 
-from .workflows.exceptions import *  # pylint: disable=wildcard-import,unused-wildcard-import
-
 
 class AriaError(Exception):
     """
@@ -29,8 +27,3 @@ class AriaError(Exception):
     pass
 
 
-class StorageError(AriaError):
-    """
-    General storage exception
-    """
-    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/__init__.py b/aria/orchestrator/__init__.py
new file mode 100644
index 0000000..cf2f066
--- /dev/null
+++ b/aria/orchestrator/__init__.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from decorators import workflow, operation
+
+from . import (
+    context,
+    events,
+    workflows,
+    decorators
+)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/context/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/__init__.py b/aria/orchestrator/context/__init__.py
new file mode 100644
index 0000000..ad89b13
--- /dev/null
+++ b/aria/orchestrator/context/__init__.py
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Provides contexts to workflow and operation
+"""
+
+from . import workflow, operation
+from .toolbelt import toolbelt

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
new file mode 100644
index 0000000..2142d01
--- /dev/null
+++ b/aria/orchestrator/context/common.py
@@ -0,0 +1,144 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+A common context for both workflow and operation
+"""
+from uuid import uuid4
+
+from aria import logger, exceptions
+from aria.tools.lru_cache import lru_cache
+
+
+class BaseContext(logger.LoggerMixin):
+    """
+    Base context object for workflow and operation
+    """
+
+    def __init__(
+            self,
+            name,
+            model_storage,
+            resource_storage,
+            deployment_id,
+            workflow_id,
+            execution_id=None,
+            task_max_attempts=1,
+            task_retry_interval=0,
+            task_ignore_failure=False,
+            **kwargs):
+        super(BaseContext, self).__init__(**kwargs)
+        self._name = name
+        self._id = str(uuid4())
+        self._model = model_storage
+        self._resource = resource_storage
+        self._deployment_id = deployment_id
+        self._workflow_id = workflow_id
+        self._execution_id = execution_id or str(uuid4())
+        self._task_max_attempts = task_max_attempts
+        self._task_retry_interval = task_retry_interval
+        self._task_ignore_failure = task_ignore_failure
+
+    def __repr__(self):
+        return (
+            '{name}(name={self.name}, '
+            'deployment_id={self._deployment_id}, '
+            'workflow_id={self._workflow_id}, '
+            'execution_id={self._execution_id})'
+            .format(name=self.__class__.__name__, self=self))
+
+    @property
+    def model(self):
+        """
+        Access to the model storage
+        :return:
+        """
+        return self._model
+
+    @property
+    def resource(self):
+        """
+        Access to the resource storage
+        :return:
+        """
+        return self._resource
+
+    @property
+    @lru_cache()
+    def blueprint(self):
+        """
+        The blueprint model
+        """
+        return self.model.blueprint.get(self.deployment.blueprint_id)
+
+    @property
+    @lru_cache()
+    def deployment(self):
+        """
+        The deployment model
+        """
+        return self.model.deployment.get(self._deployment_id)
+
+    @property
+    def execution(self):
+        """
+        The execution model
+        """
+        return self.model.execution.get(self._execution_id)
+
+    @execution.setter
+    def execution(self, value):
+        """
+        Store the execution in the model storage
+        """
+        self.model.execution.store(value)
+
+    @property
+    def name(self):
+        """
+        The operation name
+        :return:
+        """
+        return self._name
+
+    @property
+    def id(self):
+        """
+        The operation id
+        :return:
+        """
+        return self._id
+
+    def download_resource(self, destination, path=None):
+        """
+        Download a blueprint resource from the resource storage
+        """
+        try:
+            return self.resource.deployment.download(entry_id=self.deployment.id,
+                                                     destination=destination,
+                                                     path=path)
+        except exceptions.StorageError:
+            return self.resource.blueprint.download(entry_id=self.blueprint.id,
+                                                    destination=destination,
+                                                    path=path)
+
+    @lru_cache()
+    def get_resource(self, path=None):
+        """
+        Read a deployment resource as string from the resource storage
+        """
+        try:
+            return self.resource.deployment.data(entry_id=self.deployment.id, path=path)
+        except exceptions.StorageError:
+            return self.resource.blueprint.data(entry_id=self.blueprint.id, path=path)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/context/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/exceptions.py b/aria/orchestrator/context/exceptions.py
new file mode 100644
index 0000000..6704bbc
--- /dev/null
+++ b/aria/orchestrator/context/exceptions.py
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from ..exceptions import OrchestratorError
+
+
+class ContextException(OrchestratorError):
+    """
+    Context based exception
+    """
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
new file mode 100644
index 0000000..bf3686d
--- /dev/null
+++ b/aria/orchestrator/context/operation.py
@@ -0,0 +1,127 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow and operation contexts
+"""
+
+
+from .common import BaseContext
+
+
+class BaseOperationContext(BaseContext):
+    """
+    Context object used during operation creation and execution
+    """
+
+    def __init__(self, name, workflow_context, task, **kwargs):
+        super(BaseOperationContext, self).__init__(
+            name=name,
+            model_storage=workflow_context.model,
+            resource_storage=workflow_context.resource,
+            deployment_id=workflow_context._deployment_id,
+            workflow_id=workflow_context._workflow_id,
+            execution_id=workflow_context._execution_id,
+            **kwargs)
+        self._task_model = task
+        self._actor = self.task.actor
+
+    def __repr__(self):
+        details = 'operation_mapping={task.operation_mapping}; ' \
+                  'operation_inputs={task.inputs}'\
+            .format(task=self.task)
+        return '{name}({0})'.format(details, name=self.name)
+
+    @property
+    def task(self):
+        """
+        The task in the model storage
+        :return: Task model
+        """
+        return self._task_model
+
+
+class NodeOperationContext(BaseOperationContext):
+    """
+    Context for node based operations.
+    """
+    @property
+    def node(self):
+        """
+        the node of the current operation
+        :return:
+        """
+        return self._actor.node
+
+    @property
+    def node_instance(self):
+        """
+        The node instance of the current operation
+        :return:
+        """
+        return self._actor
+
+
+class RelationshipOperationContext(BaseOperationContext):
+    """
+    Context for relationship based operations.
+    """
+    @property
+    def source_node(self):
+        """
+        The source node
+        :return:
+        """
+        return self.model.node.get(self.relationship.source_id)
+
+    @property
+    def source_node_instance(self):
+        """
+        The source node instance
+        :return:
+        """
+        return self.model.node_instance.get(self.relationship_instance.source_id)
+
+    @property
+    def target_node(self):
+        """
+        The target node
+        :return:
+        """
+        return self.model.node.get(self.relationship.target_id)
+
+    @property
+    def target_node_instance(self):
+        """
+        The target node instance
+        :return:
+        """
+        return self.model.node_instance.get(self._actor.target_id)
+
+    @property
+    def relationship(self):
+        """
+        The relationship of the current operation
+        :return:
+        """
+        return self._actor.relationship
+
+    @property
+    def relationship_instance(self):
+        """
+        The relationship instance of the current operation
+        :return:
+        """
+        return self._actor

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/context/toolbelt.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py
new file mode 100644
index 0000000..0aad89c
--- /dev/null
+++ b/aria/orchestrator/context/toolbelt.py
@@ -0,0 +1,75 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+"""
+Provides with different tools for operations.
+"""
+
+from . import operation
+
+
+class NodeToolBelt(object):
+    """
+    Node operation related tool belt
+    """
+    def __init__(self, operation_context):
+        self._op_context = operation_context
+
+    @property
+    def dependent_node_instances(self):
+        """
+        Any node instance which has a relationship to the current node instance.
+        :return:
+        """
+        assert isinstance(self._op_context, operation.NodeOperationContext)
+        node_instances = self._op_context.model.node_instance.iter(
+            filters={'deployment_id': self._op_context.deployment.id}
+        )
+        for node_instance in node_instances:
+            for relationship_instance in node_instance.relationship_instances:
+                if relationship_instance.target_id == self._op_context.node_instance.id:
+                    yield node_instance
+
+    @property
+    def host_ip(self):
+        """
+        The host ip of the current node
+        :return:
+        """
+        assert isinstance(self._op_context, operation.NodeOperationContext)
+        host_id = self._op_context._actor.host_id
+        host_instance = self._op_context.model.node_instance.get(host_id)
+        return host_instance.runtime_properties.get('ip')
+
+
+class RelationshipToolBelt(object):
+    """
+    Relationship operation related tool belt
+    """
+    def __init__(self, operation_context):
+        self._op_context = operation_context
+
+
+def toolbelt(operation_context):
+    """
+    Get a toolbelt according to the current operation executor
+    :param operation_context:
+    :return:
+    """
+    if isinstance(operation_context, operation.NodeOperationContext):
+        return NodeToolBelt(operation_context)
+    elif isinstance(operation_context, operation.RelationshipOperationContext):
+        return RelationshipToolBelt(operation_context)
+    else:
+        raise RuntimeError("Operation context not supported")

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
new file mode 100644
index 0000000..3dc222b
--- /dev/null
+++ b/aria/orchestrator/context/workflow.py
@@ -0,0 +1,113 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow and operation contexts
+"""
+
+import threading
+from contextlib import contextmanager
+
+from aria import storage
+
+from .exceptions import ContextException
+from .common import BaseContext
+
+
+class WorkflowContext(BaseContext):
+    """
+    Context object used during workflow creation and execution
+    """
+    def __init__(self, parameters=None, *args, **kwargs):
+        super(WorkflowContext, self).__init__(*args, **kwargs)
+        self.parameters = parameters or {}
+        # TODO: execution creation should happen somewhere else
+        # should be moved there, when such logical place exists
+        try:
+            self.model.execution.get(self._execution_id)
+        except storage.exceptions.StorageError:
+            self._create_execution()
+
+    def __repr__(self):
+        return (
+            '{name}(deployment_id={self._deployment_id}, '
+            'workflow_id={self._workflow_id}, '
+            'execution_id={self._execution_id})'.format(
+                name=self.__class__.__name__, self=self))
+
+    def _create_execution(self):
+        execution_cls = self.model.execution.model_cls
+        execution = self.model.execution.model_cls(
+            id=self._execution_id,
+            deployment_id=self.deployment.id,
+            workflow_id=self._workflow_id,
+            blueprint_id=self.blueprint.id,
+            status=execution_cls.PENDING,
+            parameters=self.parameters,
+        )
+        self.model.execution.store(execution)
+
+    @property
+    def nodes(self):
+        """
+        Iterator over nodes
+        """
+        return self.model.node.iter(filters={'blueprint_id': self.blueprint.id})
+
+    @property
+    def node_instances(self):
+        """
+        Iterator over node instances
+        """
+        return self.model.node_instance.iter(filters={'deployment_id': self.deployment.id})
+
+
+class _CurrentContext(threading.local):
+    """
+    Provides thread-level context, which sugarcoats the task api.
+    """
+
+    def __init__(self):
+        super(_CurrentContext, self).__init__()
+        self._workflow_context = None
+
+    def _set(self, value):
+        self._workflow_context = value
+
+    def get(self):
+        """
+        Retrieves the current workflow context
+        :return: the workflow context
+        :rtype: WorkflowContext
+        """
+        if self._workflow_context is not None:
+            return self._workflow_context
+        raise ContextException("No context was set")
+
+    @contextmanager
+    def push(self, workflow_context):
+        """
+        Switches the current context to the provided context
+        :param workflow_context: the context to switch to.
+        :yields: the current context
+        """
+        prev_workflow_context = self._workflow_context
+        self._set(workflow_context)
+        try:
+            yield self
+        finally:
+            self._set(prev_workflow_context)
+
+current = _CurrentContext()

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/decorators.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/decorators.py b/aria/orchestrator/decorators.py
new file mode 100644
index 0000000..7f7685d
--- /dev/null
+++ b/aria/orchestrator/decorators.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow and operation decorators
+"""
+
+from uuid import uuid4
+from functools import partial, wraps
+
+from aria.tools.validation import validate_function_arguments
+
+from . import context
+from .workflows.api import task_graph
+
+
+def workflow(func=None, suffix_template=''):
+    """
+    Workflow decorator
+    """
+    if func is None:
+        return partial(workflow, suffix_template=suffix_template)
+
+    @wraps(func)
+    def _wrapper(ctx, **workflow_parameters):
+
+        workflow_name = _generate_name(
+            func_name=func.__name__,
+            suffix_template=suffix_template,
+            ctx=ctx,
+            **workflow_parameters)
+
+        workflow_parameters.setdefault('ctx', ctx)
+        workflow_parameters.setdefault('graph', task_graph.TaskGraph(workflow_name))
+        validate_function_arguments(func, workflow_parameters)
+        with context.workflow.current.push(ctx):
+            func(**workflow_parameters)
+        return workflow_parameters['graph']
+    return _wrapper
+
+
+def operation(func=None, toolbelt=False, suffix_template=''):
+    """
+    Operation decorator
+    """
+    if func is None:
+        return partial(operation, suffix_template=suffix_template, toolbelt=toolbelt)
+
+    @wraps(func)
+    def _wrapper(**func_kwargs):
+        if toolbelt:
+            operation_toolbelt = context.toolbelt(func_kwargs['ctx'])
+            func_kwargs.setdefault('toolbelt', operation_toolbelt)
+        validate_function_arguments(func, func_kwargs)
+        return func(**func_kwargs)
+    return _wrapper
+
+
+def _generate_name(func_name, ctx, suffix_template, **custom_kwargs):
+    return '{func_name}.{suffix}'.format(
+        func_name=func_name,
+        suffix=suffix_template.format(ctx=ctx, **custom_kwargs) or str(uuid4()))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/events/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events/__init__.py b/aria/orchestrator/events/__init__.py
new file mode 100644
index 0000000..40bdd24
--- /dev/null
+++ b/aria/orchestrator/events/__init__.py
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+ARIA's events Sub-Package
+Path: aria.events
+
+Events package provides events mechanism for different executions in aria.
+
+
+1. storage_event_handler: implementation of storage handlers for workflow and operation events.
+2. logger_event_handler: implementation of logger handlers for workflow and operation events.
+
+API:
+    * start_task_signal
+    * on_success_task_signal
+    * on_failure_task_signal
+    * start_workflow_signal
+    * on_success_workflow_signal
+    * on_failure_workflow_signal
+"""
+
+import os
+
+from blinker import signal
+
+from aria.tools import plugin_installer
+
+# workflow engine task signals:
+sent_task_signal = signal('sent_task_signal')
+start_task_signal = signal('start_task_signal')
+on_success_task_signal = signal('success_task_signal')
+on_failure_task_signal = signal('failure_task_signal')
+
+# workflow engine workflow signals:
+start_workflow_signal = signal('start_workflow_signal')
+on_cancelling_workflow_signal = signal('on_cancelling_workflow_signal')
+on_cancelled_workflow_signal = signal('on_cancelled_workflow_signal')
+on_success_workflow_signal = signal('on_success_workflow_signal')
+on_failure_workflow_signal = signal('on_failure_workflow_signal')
+
+plugin_installer(
+    path=os.path.dirname(os.path.realpath(__file__)),
+    plugin_suffix='_event_handler',
+    package=__package__)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/events/builtin_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events/builtin_event_handler.py b/aria/orchestrator/events/builtin_event_handler.py
new file mode 100644
index 0000000..c5cccfe
--- /dev/null
+++ b/aria/orchestrator/events/builtin_event_handler.py
@@ -0,0 +1,123 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Aria's events Sub-Package
+Path: aria.events.storage_event_handler
+
+Implementation of storage handlers for workflow and operation events.
+"""
+
+
+from datetime import (
+    datetime,
+    timedelta,
+)
+
+from . import (
+    start_workflow_signal,
+    on_success_workflow_signal,
+    on_failure_workflow_signal,
+    on_cancelled_workflow_signal,
+    on_cancelling_workflow_signal,
+    sent_task_signal,
+    start_task_signal,
+    on_success_task_signal,
+    on_failure_task_signal,
+)
+
+
+@sent_task_signal.connect
+def _task_sent(task, *args, **kwargs):
+    with task._update():
+        task.status = task.SENT
+
+
+@start_task_signal.connect
+def _task_started(task, *args, **kwargs):
+    with task._update():
+        task.started_at = datetime.utcnow()
+        task.status = task.STARTED
+
+
+@on_failure_task_signal.connect
+def _task_failed(task, *args, **kwargs):
+    with task._update():
+        should_retry = (
+            (task.retry_count < task.max_attempts - 1 or
+             task.max_attempts == task.INFINITE_RETRIES) and
+            # ignore_failure check here means the task will not be retries and it will be marked as
+            # failed. The engine will also look at ignore_failure so it won't fail the workflow.
+            not task.ignore_failure)
+        if should_retry:
+            task.status = task.RETRYING
+            task.retry_count += 1
+            task.due_at = datetime.utcnow() + timedelta(seconds=task.retry_interval)
+        else:
+            task.ended_at = datetime.utcnow()
+            task.status = task.FAILED
+
+
+@on_success_task_signal.connect
+def _task_succeeded(task, *args, **kwargs):
+    with task._update():
+        task.ended_at = datetime.utcnow()
+        task.status = task.SUCCESS
+
+
+@start_workflow_signal.connect
+def _workflow_started(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    execution.status = execution.STARTED
+    execution.started_at = datetime.utcnow()
+    workflow_context.execution = execution
+
+
+@on_failure_workflow_signal.connect
+def _workflow_failed(workflow_context, exception, *args, **kwargs):
+    execution = workflow_context.execution
+    execution.error = str(exception)
+    execution.status = execution.FAILED
+    execution.ended_at = datetime.utcnow()
+    workflow_context.execution = execution
+
+
+@on_success_workflow_signal.connect
+def _workflow_succeeded(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    execution.status = execution.TERMINATED
+    execution.ended_at = datetime.utcnow()
+    workflow_context.execution = execution
+
+
+@on_cancelled_workflow_signal.connect
+def _workflow_cancelled(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    # _workflow_cancelling function may have called this function
+    # already
+    if execution.status == execution.CANCELLED:
+        return
+    execution.status = execution.CANCELLED
+    execution.ended_at = datetime.utcnow()
+    workflow_context.execution = execution
+
+
+@on_cancelling_workflow_signal.connect
+def _workflow_cancelling(workflow_context, *args, **kwargs):
+    execution = workflow_context.execution
+    if execution.status == execution.PENDING:
+        return _workflow_cancelled(workflow_context=workflow_context)
+    execution.status = execution.CANCELLING
+    workflow_context.execution = execution

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/events/workflow_engine_event_handler.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/events/workflow_engine_event_handler.py b/aria/orchestrator/events/workflow_engine_event_handler.py
new file mode 100644
index 0000000..7df11d1
--- /dev/null
+++ b/aria/orchestrator/events/workflow_engine_event_handler.py
@@ -0,0 +1,74 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+"""
+Aria's events Sub-Package
+Path: aria.events.storage_event_handler
+
+Implementation of logger handlers for workflow and operation events.
+"""
+
+from . import (
+    start_task_signal,
+    on_success_task_signal,
+    on_failure_task_signal,
+    start_workflow_signal,
+    on_success_workflow_signal,
+    on_failure_workflow_signal,
+    on_cancelled_workflow_signal,
+    on_cancelling_workflow_signal,
+)
+
+
+@start_task_signal.connect
+def _start_task_handler(task, **kwargs):
+    task.logger.debug('Event: Starting task: {task.name}'.format(task=task))
+
+
+@on_success_task_signal.connect
+def _success_task_handler(task, **kwargs):
+    task.logger.debug('Event: Task success: {task.name}'.format(task=task))
+
+
+@on_failure_task_signal.connect
+def _failure_operation_handler(task, **kwargs):
+    task.logger.error('Event: Task failure: {task.name}'.format(task=task),
+                      exc_info=kwargs.get('exception', True))
+
+
+@start_workflow_signal.connect
+def _start_workflow_handler(context, **kwargs):
+    context.logger.debug('Event: Starting workflow: {context.name}'.format(context=context))
+
+
+@on_failure_workflow_signal.connect
+def _failure_workflow_handler(context, **kwargs):
+    context.logger.debug('Event: Workflow failure: {context.name}'.format(context=context))
+
+
+@on_success_workflow_signal.connect
+def _success_workflow_handler(context, **kwargs):
+    context.logger.debug('Event: Workflow success: {context.name}'.format(context=context))
+
+
+@on_cancelled_workflow_signal.connect
+def _cancel_workflow_handler(context, **kwargs):
+    context.logger.debug('Event: Workflow cancelled: {context.name}'.format(context=context))
+
+
+@on_cancelling_workflow_signal.connect
+def _cancelling_workflow_handler(context, **kwargs):
+    context.logger.debug('Event: Workflow cancelling: {context.name}'.format(context=context))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
new file mode 100644
index 0000000..75b37cf
--- /dev/null
+++ b/aria/orchestrator/exceptions.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from aria.exceptions import AriaError
+
+
+class OrchestratorError(AriaError):
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/workflows/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/__init__.py b/aria/orchestrator/workflows/__init__.py
new file mode 100644
index 0000000..ae1e83e
--- /dev/null
+++ b/aria/orchestrator/workflows/__init__.py
@@ -0,0 +1,14 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/workflows/api/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/__init__.py b/aria/orchestrator/workflows/api/__init__.py
new file mode 100644
index 0000000..a3a17ee
--- /dev/null
+++ b/aria/orchestrator/workflows/api/__init__.py
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Provides API for building tasks
+"""
+
+from . import task, task_graph

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/675b820f/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
new file mode 100644
index 0000000..4d36725
--- /dev/null
+++ b/aria/orchestrator/workflows/api/task.py
@@ -0,0 +1,171 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Provides the tasks to be entered into the task graph
+"""
+from uuid import uuid4
+
+import aria
+
+from ... import context
+from .. import exceptions
+
+
+class BaseTask(object):
+    """
+    Abstract task_graph task
+    """
+    def __init__(self, ctx=None, **kwargs):
+        if ctx is not None:
+            self._workflow_context = ctx
+        else:
+            self._workflow_context = context.workflow.current.get()
+        self._id = str(uuid4())
+
+    @property
+    def id(self):
+        """
+        uuid4 generated id
+        :return:
+        """
+        return self._id
+
+    @property
+    def workflow_context(self):
+        """
+        the context of the current workflow
+        :return:
+        """
+        return self._workflow_context
+
+
+class OperationTask(BaseTask):
+    """
+    Represents an operation task in the task_graph
+    """
+
+    SOURCE_OPERATION = 'source_operations'
+    TARGET_OPERATION = 'target_operations'
+
+    def __init__(self,
+                 name,
+                 actor,
+                 operation_mapping,
+                 max_attempts=None,
+                 retry_interval=None,
+                 ignore_failure=None,
+                 inputs=None):
+        """
+        Creates an operation task using the name, details, node instance and any additional kwargs.
+        :param name: the operation of the name.
+        :param operation_details: the details for the operation.
+        :param actor: the operation host on which this operation is registered.
+        :param inputs: operation inputs.
+        """
+        assert isinstance(actor, (aria.storage.models.NodeInstance,
+                                  aria.storage.models.RelationshipInstance))
+        super(OperationTask, self).__init__()
+        self.actor = actor
+        self.name = '{name}.{actor.id}'.format(name=name, actor=actor)
+        self.operation_mapping = operation_mapping
+        self.inputs = inputs or {}
+        self.max_attempts = (self.workflow_context._task_max_attempts
+                             if max_attempts is None else max_attempts)
+        self.retry_interval = (self.workflow_context._task_retry_interval
+                               if retry_interval is None else retry_interval)
+        self.ignore_failure = (self.workflow_context._task_ignore_failure
+                               if ignore_failure is None else ignore_failure)
+
+    @classmethod
+    def node_instance(cls, instance, name, inputs=None, *args, **kwargs):
+        """
+        Represents a node based operation
+
+        :param instance: the node of which this operation belongs to.
+        :param name: the name of the operation.
+        """
+        assert isinstance(instance, aria.storage.models.NodeInstance)
+        operation_details = instance.node.operations[name]
+        operation_inputs = operation_details.get('inputs', {})
+        operation_inputs.update(inputs or {})
+        return cls(name=name,
+                   actor=instance,
+                   operation_mapping=operation_details.get('operation', ''),
+                   inputs=operation_inputs,
+                   *args,
+                   **kwargs)
+
+    @classmethod
+    def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs):
+        """
+        Represents a relationship based operation
+
+        :param instance: the relationship of which this operation belongs to.
+        :param name: the name of the operation.
+        :param operation_end: source or target end of the relationship, this corresponds directly
+        with 'source_operations' and 'target_operations'
+        :param inputs any additional inputs to the operation
+        """
+        assert isinstance(instance, aria.storage.models.RelationshipInstance)
+        if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]:
+            raise exceptions.TaskException('The operation end should be {0} or {1}'.format(
+                cls.TARGET_OPERATION, cls.SOURCE_OPERATION
+            ))
+        operation_details = getattr(instance.relationship, operation_end)[name]
+        operation_inputs = operation_details.get('inputs', {})
+        operation_inputs.update(inputs or {})
+        return cls(actor=instance,
+                   name=name,
+                   operation_mapping=operation_details.get('operation'),
+                   inputs=operation_inputs,
+                   *args,
+                   **kwargs)
+
+
+class WorkflowTask(BaseTask):
+    """
+    Represents an workflow task in the task_graph
+    """
+    def __init__(self, workflow_func, **kwargs):
+        """
+        Creates a workflow based task using the workflow_func provided, and its kwargs
+        :param workflow_func: the function to run
+        :param kwargs: the kwargs that would be passed to the workflow_func
+        """
+        super(WorkflowTask, self).__init__(**kwargs)
+        kwargs['ctx'] = self.workflow_context
+        self._graph = workflow_func(**kwargs)
+
+    @property
+    def graph(self):
+        """
+        The graph constructed by the sub workflow
+        :return:
+        """
+        return self._graph
+
+    def __getattr__(self, item):
+        try:
+            return getattr(self._graph, item)
+        except AttributeError:
+            return super(WorkflowTask, self).__getattribute__(item)
+
+
+class StubTask(BaseTask):
+    """
+    Enables creating empty tasks.
+    """
+    pass


Mime
View raw message