ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dankil...@apache.org
Subject [2/2] incubator-ariatosca git commit: remove cloudify stuff
Date Wed, 19 Oct 2016 10:58:57 GMT
remove cloudify stuff


Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/c8a5c91d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/c8a5c91d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/c8a5c91d

Branch: refs/heads/wf-wip
Commit: c8a5c91d0046216fb78ba6b4e31270f0aff4fa0b
Parents: db4b207
Author: Dan Kilman <dankilman@gmail.com>
Authored: Wed Oct 19 13:58:46 2016 +0300
Committer: Dan Kilman <dankilman@gmail.com>
Committed: Wed Oct 19 13:58:46 2016 +0300

----------------------------------------------------------------------
 README.md                                       |    6 -
 aria/from_cloudify/__init__.py                  |    0
 aria/from_cloudify/workflows/__init__.py        |   20 -
 aria/from_cloudify/workflows/events.py          |  197 ---
 aria/from_cloudify/workflows/local.py           |  598 -------
 aria/from_cloudify/workflows/tasks.py           |  767 ---------
 aria/from_cloudify/workflows/tasks_graph.py     |  372 -----
 aria/from_cloudify/workflows/workflow_api.py    |   47 -
 .../from_cloudify/workflows/workflow_context.py | 1525 ------------------
 aria/workflows/builtin/heal.py                  |    4 +-
 aria/workflows/builtin/scale.py                 |   36 +-
 aria/workflows/builtin/workflows.py             |   28 +-
 ctx_api                                         |  113 --
 tests/events/test_builtin_event_handlers.py     |   10 +-
 14 files changed, 39 insertions(+), 3684 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c8a5c91d/README.md
----------------------------------------------------------------------
diff --git a/README.md b/README.md
index d58cc03..b945a29 100644
--- a/README.md
+++ b/README.md
@@ -1,10 +1,4 @@
 Aria
 ====
 
-
-[![Build Status](https://travis-ci.org/cloudify-cosmo/aria.svg?branch=master)](https://travis-ci.org/cloudify-cosmo/aria)
-[![PyPI](http://img.shields.io/pypi/dm/aria.svg)](http://img.shields.io/pypi/dm/aria.svg)
-[![PypI](http://img.shields.io/pypi/v/aria.svg)](http://img.shields.io/pypi/v/aria.svg)
-
-
 See http://ariatosca.org/

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c8a5c91d/aria/from_cloudify/__init__.py
----------------------------------------------------------------------
diff --git a/aria/from_cloudify/__init__.py b/aria/from_cloudify/__init__.py
deleted file mode 100644
index e69de29..0000000

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c8a5c91d/aria/from_cloudify/workflows/__init__.py
----------------------------------------------------------------------
diff --git a/aria/from_cloudify/workflows/__init__.py b/aria/from_cloudify/workflows/__init__.py
deleted file mode 100644
index 8a0a917..0000000
--- a/aria/from_cloudify/workflows/__init__.py
+++ /dev/null
@@ -1,20 +0,0 @@
-########
-# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#        http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#    * See the License for the specific language governing permissions and
-#    * limitations under the License.
-
-__author__ = 'dank'
-
-from cloudify.workflows import workflow_api as api  # noqa
-from cloudify.state import workflow_ctx as ctx  # noqa
-from cloudify.state import workflow_parameters as parameters  # noqa

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c8a5c91d/aria/from_cloudify/workflows/events.py
----------------------------------------------------------------------
diff --git a/aria/from_cloudify/workflows/events.py b/aria/from_cloudify/workflows/events.py
deleted file mode 100644
index b8faa1b..0000000
--- a/aria/from_cloudify/workflows/events.py
+++ /dev/null
@@ -1,197 +0,0 @@
-########
-# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#        http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#    * See the License for the specific language governing permissions and
-#    * limitations under the License.
-
-
-from cloudify import logs
-from cloudify.exceptions import OperationRetry
-from cloudify.workflows import tasks as tasks_api
-
-
-class Monitor(object):
-    """Monitor with handlers for different celery events"""
-
-    def __init__(self, tasks_graph):
-        """
-        :param tasks_graph: The task graph. Used to extract tasks based on the
-                            events task id.
-        """
-        self.tasks_graph = tasks_graph
-        self._receiver = None
-        self._should_stop = False
-
-    def task_sent(self, event):
-        pass
-
-    def task_received(self, event):
-        pass
-
-    def task_started(self, event):
-        self._handle(tasks_api.TASK_STARTED, event, send_event=True)
-
-    def task_succeeded(self, event):
-        self._handle(tasks_api.TASK_SUCCEEDED, event, send_event=True)
-
-    def task_failed(self, event):
-        if event.get('exception', '').startswith(OperationRetry.__name__):
-            state = tasks_api.TASK_RESCHEDULED
-        else:
-            state = tasks_api.TASK_FAILED
-        self._handle(state, event, send_event=False)
-
-    def task_revoked(self, event):
-        pass
-
-    def task_retried(self, event):
-        pass
-
-    def _handle(self, state, event, send_event):
-        task_id = event['uuid']
-        task = self.tasks_graph.get_task(task_id)
-        if task is not None:
-            if send_event:
-                send_task_event(state, task, send_task_event_func_remote,
-                                event)
-            task.set_state(state)
-
-    def capture(self):
-        # Only called when running within an agent, so import here
-        from cloudify_agent.app import app
-        with app.connection() as connection:
-            self._receiver = app.events.Receiver(connection, handlers={
-                'task-sent': self.task_sent,
-                'task-received': self.task_received,
-                'task-started': self.task_started,
-                'task-succeeded': self.task_succeeded,
-                'task-failed': self.task_failed,
-                'task-revoked': self.task_revoked,
-                'task-retried': self.task_retried
-            })
-            for _ in self._receiver.itercapture(limit=None,
-                                                timeout=None,
-                                                wakeup=True):
-                if self._should_stop:
-                    return
-
-    def stop(self):
-        self._should_stop = True
-        self._receiver.should_stop = True
-
-
-def send_task_event_func_remote(task, event_type, message,
-                                additional_context=None):
-    _send_task_event_func(task, event_type, message,
-                          out_func=logs.amqp_event_out,
-                          additional_context=additional_context)
-
-
-def send_task_event_func_local(task, event_type, message,
-                               additional_context=None):
-    _send_task_event_func(task, event_type, message,
-                          out_func=logs.stdout_event_out,
-                          additional_context=additional_context)
-
-
-def _send_task_event_func(task, event_type, message, out_func,
-                          additional_context):
-    if task.cloudify_context is None:
-        logs.send_workflow_event(ctx=task.workflow_context,
-                                 event_type=event_type,
-                                 message=message,
-                                 out_func=out_func,
-                                 additional_context=additional_context)
-    else:
-        logs.send_task_event(cloudify_context=task.cloudify_context,
-                             event_type=event_type,
-                             message=message,
-                             out_func=out_func,
-                             additional_context=additional_context)
-
-
-def _filter_task(task, state):
-    return state != tasks_api.TASK_FAILED and not task.send_task_events
-
-
-def send_task_event(state, task, send_event_func, event):
-    """
-    Send a task event delegating to 'send_event_func'
-    which will send events to RabbitMQ or use the workflow context logger
-    in local context
-
-    :param state: the task state (valid: ['sending', 'started', 'rescheduled',
-                  'succeeded', 'failed'])
-    :param task: a WorkflowTask instance to send the event for
-    :param send_event_func: function for actually sending the event somewhere
-    :param event: a dict with either a result field or an exception fields
-                  follows celery event structure but used by local tasks as
-                  well
-    """
-    if _filter_task(task, state):
-        return
-
-    if state in (tasks_api.TASK_FAILED, tasks_api.TASK_RESCHEDULED,
-                 tasks_api.TASK_SUCCEEDED) and event is None:
-        raise RuntimeError('Event for task {0} is None'.format(task.name))
-
-    if event and event.get('exception'):
-        exception_str = str(event.get('exception'))
-    else:
-        exception_str = None
-
-    if state == tasks_api.TASK_SENDING:
-        message = "Sending task '{0}'".format(task.name)
-        event_type = 'sending_task'
-    elif state == tasks_api.TASK_STARTED:
-        message = "Task started '{0}'".format(task.name)
-        event_type = 'task_started'
-    elif state == tasks_api.TASK_SUCCEEDED:
-        result = str(event.get('result'))
-        suffix = ' ({0})'.format(result) if result not in ("'None'",
-                                                           'None') else ''
-        message = "Task succeeded '{0}{1}'".format(task.name, suffix)
-        event_type = 'task_succeeded'
-    elif state == tasks_api.TASK_RESCHEDULED:
-        message = "Task rescheduled '{0}'".format(task.name)
-        if exception_str:
-            message = '{0} -> {1}'.format(message, exception_str)
-        event_type = 'task_rescheduled'
-        task.error = exception_str
-    elif state == tasks_api.TASK_FAILED:
-        message = "Task failed '{0}'".format(task.name)
-        if exception_str:
-            message = "{0} -> {1}".format(message, exception_str)
-        event_type = 'task_failed'
-        task.error = exception_str
-    else:
-        raise RuntimeError('unhandled event type: {0}'.format(state))
-
-    if task.current_retries > 0:
-        retry = ' [retry {0}{1}]'.format(
-            task.current_retries,
-            '/{0}'.format(task.total_retries)
-            if task.total_retries >= 0 else '')
-        message = '{0}{1}'.format(message, retry)
-
-    additional_context = {
-        'task_current_retries': task.current_retries,
-        'task_total_retries': task.total_retries
-    }
-
-    if state in (tasks_api.TASK_FAILED, tasks_api.TASK_RESCHEDULED):
-        additional_context['task_error_causes'] = event.get('causes')
-
-    send_event_func(task=task,
-                    event_type=event_type,
-                    message=message,
-                    additional_context=additional_context)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c8a5c91d/aria/from_cloudify/workflows/local.py
----------------------------------------------------------------------
diff --git a/aria/from_cloudify/workflows/local.py b/aria/from_cloudify/workflows/local.py
deleted file mode 100644
index 6293edc..0000000
--- a/aria/from_cloudify/workflows/local.py
+++ /dev/null
@@ -1,598 +0,0 @@
-########
-# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#        http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#    * See the License for the specific language governing permissions and
-#    * limitations under the License.
-
-import os
-import tempfile
-import copy
-import importlib
-import shutil
-import uuid
-import json
-import threading
-from StringIO import StringIO
-from contextlib import contextmanager
-
-from cloudify_rest_client.nodes import Node
-from cloudify_rest_client.node_instances import NodeInstance
-
-from cloudify import dispatch
-from cloudify.workflows.workflow_context import (
-    DEFAULT_LOCAL_TASK_THREAD_POOL_SIZE)
-
-try:
-    from dsl_parser.constants import HOST_TYPE
-    from dsl_parser import parser as dsl_parser, tasks as dsl_tasks
-    from dsl_parser import functions as dsl_functions
-    _import_error = None
-except ImportError as e:
-    _import_error = str(e)
-    dsl_parser = None
-    dsl_tasks = None
-    dsl_functions = None
-    HOST_TYPE = None
-
-
-class _Environment(object):
-
-    def __init__(self,
-                 storage,
-                 blueprint_path=None,
-                 name='local',
-                 inputs=None,
-                 load_existing=False,
-                 ignored_modules=None,
-                 provider_context=None,
-                 resolver=None,
-                 validate_version=True):
-        self.storage = storage
-        self.storage.env = self
-
-        if load_existing:
-            self.storage.load(name)
-        else:
-            plan, nodes, node_instances = _parse_plan(blueprint_path,
-                                                      inputs,
-                                                      ignored_modules,
-                                                      resolver,
-                                                      validate_version)
-            storage.init(
-                name=name,
-                plan=plan,
-                nodes=nodes,
-                node_instances=node_instances,
-                blueprint_path=blueprint_path,
-                provider_context=provider_context)
-
-    @property
-    def plan(self):
-        return self.storage.plan
-
-    @property
-    def name(self):
-        return self.storage.name
-
-    def outputs(self):
-        return dsl_functions.evaluate_outputs(
-            outputs_def=self.plan['outputs'],
-            get_node_instances_method=self.storage.get_node_instances,
-            get_node_instance_method=self.storage.get_node_instance,
-            get_node_method=self.storage.get_node)
-
-    def evaluate_functions(self, payload, context):
-        return dsl_functions.evaluate_functions(
-            payload=payload,
-            context=context,
-            get_node_instances_method=self.storage.get_node_instances,
-            get_node_instance_method=self.storage.get_node_instance,
-            get_node_method=self.storage.get_node)
-
-    def execute(self,
-                workflow,
-                parameters=None,
-                allow_custom_parameters=False,
-                task_retries=-1,
-                task_retry_interval=30,
-                subgraph_retries=0,
-                task_thread_pool_size=DEFAULT_LOCAL_TASK_THREAD_POOL_SIZE):
-        workflows = self.plan['workflows']
-        workflow_name = workflow
-        if workflow_name not in workflows:
-            raise ValueError("'{0}' workflow does not exist. "
-                             "existing workflows are: {1}"
-                             .format(workflow_name,
-                                     workflows.keys()))
-
-        workflow = workflows[workflow_name]
-        execution_id = str(uuid.uuid4())
-        ctx = {
-            'type': 'workflow',
-            'local': True,
-            'deployment_id': self.name,
-            'blueprint_id': self.name,
-            'execution_id': execution_id,
-            'workflow_id': workflow_name,
-            'storage': self.storage,
-            'task_retries': task_retries,
-            'task_retry_interval': task_retry_interval,
-            'subgraph_retries': subgraph_retries,
-            'local_task_thread_pool_size': task_thread_pool_size,
-            'task_name': workflow['operation']
-        }
-
-        merged_parameters = _merge_and_validate_execution_parameters(
-            workflow, workflow_name, parameters, allow_custom_parameters)
-
-        return dispatch.dispatch(__cloudify_context=ctx, **merged_parameters)
-
-
-def init_env(blueprint_path,
-             name='local',
-             inputs=None,
-             storage=None,
-             ignored_modules=None,
-             provider_context=None,
-             resolver=None,
-             validate_version=True):
-    if storage is None:
-        storage = InMemoryStorage()
-    return _Environment(storage=storage,
-                        blueprint_path=blueprint_path,
-                        name=name,
-                        inputs=inputs,
-                        load_existing=False,
-                        ignored_modules=ignored_modules,
-                        provider_context=provider_context,
-                        resolver=resolver,
-                        validate_version=validate_version)
-
-
-def load_env(name, storage, resolver=None):
-    return _Environment(storage=storage,
-                        name=name,
-                        load_existing=True,
-                        resolver=resolver)
-
-
-def _parse_plan(blueprint_path, inputs, ignored_modules, resolver,
-                validate_version):
-    if dsl_parser is None:
-        raise ImportError('cloudify-dsl-parser must be installed to '
-                          'execute local workflows. '
-                          '(e.g. "pip install cloudify-dsl-parser") [{0}]'
-                          .format(_import_error))
-    plan = dsl_tasks.prepare_deployment_plan(
-        dsl_parser.parse_from_path(
-            dsl_file_path=blueprint_path,
-            resolver=resolver,
-            validate_version=validate_version),
-        inputs=inputs)
-    nodes = [Node(node) for node in plan['nodes']]
-    node_instances = [NodeInstance(instance)
-                      for instance in plan['node_instances']]
-    _prepare_nodes_and_instances(nodes, node_instances, ignored_modules)
-    return plan, nodes, node_instances
-
-
-def _validate_node(node):
-    if HOST_TYPE in node['type_hierarchy']:
-        install_agent_prop = node.properties.get('install_agent')
-        if install_agent_prop:
-            raise ValueError("'install_agent': true is not supported "
-                             "(it is True by default) "
-                             "when executing local workflows. "
-                             "The 'install_agent' property "
-                             "must be set to false for each node of type {0}."
-                             .format(HOST_TYPE))
-
-
-def _prepare_nodes_and_instances(nodes, node_instances, ignored_modules):
-
-    def scan(parent, name, node):
-        for operation in parent.get(name, {}).values():
-            if not operation['operation']:
-                continue
-            _get_module_method(operation['operation'],
-                               tpe=name,
-                               node_name=node.id,
-                               ignored_modules=ignored_modules)
-
-    for node in nodes:
-        scalable = node['capabilities']['scalable']['properties']
-        node.update(dict(
-            number_of_instances=scalable['current_instances'],
-            deploy_number_of_instances=scalable['default_instances'],
-            min_number_of_instances=scalable['min_instances'],
-            max_number_of_instances=scalable['max_instances'],
-        ))
-        if 'relationships' not in node:
-            node['relationships'] = []
-        scan(node, 'operations', node)
-        _validate_node(node)
-        for relationship in node['relationships']:
-            scan(relationship, 'source_operations', node)
-            scan(relationship, 'target_operations', node)
-
-    for node_instance in node_instances:
-        node_instance['version'] = 0
-        node_instance['runtime_properties'] = {}
-        node_instance['node_id'] = node_instance['name']
-        if 'relationships' not in node_instance:
-            node_instance['relationships'] = []
-
-
-def _get_module_method(module_method_path, tpe, node_name,
-                       ignored_modules=None):
-    ignored_modules = ignored_modules or []
-    split = module_method_path.split('.')
-    module_name = '.'.join(split[:-1])
-    if module_name in ignored_modules:
-        return None
-    method_name = split[-1]
-    try:
-        module = importlib.import_module(module_name)
-    except ImportError:
-        raise ImportError('mapping error: No module named {0} '
-                          '[node={1}, type={2}]'
-                          .format(module_name, node_name, tpe))
-    try:
-        return getattr(module, method_name)
-    except AttributeError:
-        raise AttributeError("mapping error: {0} has no attribute '{1}' "
-                             "[node={2}, type={3}]"
-                             .format(module.__name__, method_name,
-                                     node_name, tpe))
-
-
-def _try_convert_from_str(string, target_type):
-    if target_type == basestring:
-        return string
-    if target_type == bool:
-        if string.lower() == 'true':
-            return True
-        if string.lower() == 'false':
-            return False
-        return string
-    try:
-        return target_type(string)
-    except ValueError:
-        return string
-
-
-def _merge_and_validate_execution_parameters(
-        workflow, workflow_name, execution_parameters=None,
-        allow_custom_parameters=False):
-
-    merged_parameters = {}
-    workflow_parameters = workflow.get('parameters', {})
-    execution_parameters = execution_parameters or {}
-
-    missing_mandatory_parameters = set()
-
-    allowed_types = {
-        'integer': int,
-        'float': float,
-        'string': basestring,
-        'boolean': bool
-    }
-    wrong_types = {}
-
-    for name, param in workflow_parameters.iteritems():
-
-        if 'type' in param and name in execution_parameters:
-
-            # check if need to convert from string
-            if (isinstance(execution_parameters[name], basestring) and
-                    param['type'] in allowed_types):
-                execution_parameters[name] = \
-                    _try_convert_from_str(
-                        execution_parameters[name],
-                        allowed_types[param['type']])
-
-            # validate type
-            if not isinstance(execution_parameters[name],
-                              allowed_types.get(param['type'], object)):
-                wrong_types[name] = param['type']
-
-        if 'default' not in param:
-            if name not in execution_parameters:
-                missing_mandatory_parameters.add(name)
-                continue
-            merged_parameters[name] = execution_parameters[name]
-        else:
-            merged_parameters[name] = execution_parameters[name] if \
-                name in execution_parameters else param['default']
-
-    if missing_mandatory_parameters:
-        raise ValueError(
-            'Workflow "{0}" must be provided with the following '
-            'parameters to execute: {1}'
-            .format(workflow_name, ','.join(missing_mandatory_parameters)))
-
-    if wrong_types:
-        error_message = StringIO()
-        for param_name, param_type in wrong_types.iteritems():
-            error_message.write('Parameter "{0}" must be of type {1}\n'.
-                                format(param_name, param_type))
-        raise ValueError(error_message.getvalue())
-
-    custom_parameters = dict(
-        (k, v) for (k, v) in execution_parameters.iteritems()
-        if k not in workflow_parameters)
-
-    if not allow_custom_parameters and custom_parameters:
-        raise ValueError(
-            'Workflow "{0}" does not have the following parameters '
-            'declared: {1}. Remove these parameters or use '
-            'the flag for allowing custom parameters'
-            .format(workflow_name, ','.join(custom_parameters.keys())))
-
-    merged_parameters.update(custom_parameters)
-    return merged_parameters
-
-
-class _Storage(object):
-
-    def __init__(self):
-        self.name = None
-        self.resources_root = None
-        self.plan = None
-        self._nodes = None
-        self._locks = None
-        self.env = None
-        self._provider_context = None
-
-    def init(self, name, plan, nodes, node_instances, blueprint_path,
-             provider_context):
-        self.name = name
-        self.resources_root = os.path.dirname(os.path.abspath(blueprint_path))
-        self.plan = plan
-        self._provider_context = provider_context or {}
-        self._init_locks_and_nodes(nodes)
-
-    def _init_locks_and_nodes(self, nodes):
-        self._nodes = dict((node.id, node) for node in nodes)
-        self._locks = dict((instance_id, threading.RLock()) for instance_id
-                           in self._instance_ids())
-
-    def load(self, name):
-        raise NotImplementedError()
-
-    def get_resource(self, resource_path):
-        with open(os.path.join(self.resources_root, resource_path)) as f:
-            return f.read()
-
-    def download_resource(self, resource_path, target_path=None):
-        if not target_path:
-            suffix = '-{0}'.format(os.path.basename(resource_path))
-            target_path = tempfile.mktemp(suffix=suffix)
-        resource = self.get_resource(resource_path)
-        with open(target_path, 'wb') as f:
-            f.write(resource)
-        return target_path
-
-    def update_node_instance(self,
-                             node_instance_id,
-                             version,
-                             runtime_properties=None,
-                             state=None):
-        with self._lock(node_instance_id):
-            instance = self._get_node_instance(node_instance_id)
-            if state is None and version != instance['version']:
-                raise StorageConflictError('version {0} does not match '
-                                           'current version of '
-                                           'node instance {1} which is {2}'
-                                           .format(version,
-                                                   node_instance_id,
-                                                   instance['version']))
-            else:
-                instance['version'] += 1
-            if runtime_properties is not None:
-                instance['runtime_properties'] = runtime_properties
-            if state is not None:
-                instance['state'] = state
-            self._store_instance(instance)
-
-    def _get_node_instance(self, node_instance_id):
-        instance = self._load_instance(node_instance_id)
-        if instance is None:
-            raise RuntimeError('Instance {0} does not exist'
-                               .format(node_instance_id))
-        return instance
-
-    def get_node(self, node_id):
-        node = self._nodes.get(node_id)
-        if node is None:
-            raise RuntimeError('Node {0} does not exist'
-                               .format(node_id))
-        return copy.deepcopy(node)
-
-    def get_nodes(self):
-        return copy.deepcopy(self._nodes.values())
-
-    def get_node_instance(self, node_instance_id):
-        return copy.deepcopy(self._get_node_instance(node_instance_id))
-
-    def get_provider_context(self):
-        return copy.deepcopy(self._provider_context)
-
-    def _load_instance(self, node_instance_id):
-        raise NotImplementedError()
-
-    def _store_instance(self, node_instance):
-        raise NotImplementedError()
-
-    def get_node_instances(self, node_id=None):
-        raise NotImplementedError()
-
-    def _instance_ids(self):
-        raise NotImplementedError()
-
-    def _lock(self, node_instance_id):
-        return self._locks[node_instance_id]
-
-    def get_workdir(self):
-        raise NotImplementedError()
-
-
-class InMemoryStorage(_Storage):
-
-    def __init__(self):
-        super(InMemoryStorage, self).__init__()
-        self._node_instances = None
-
-    def init(self, name, plan, nodes, node_instances, blueprint_path,
-             provider_context):
-        self.plan = plan
-        self._node_instances = dict((instance.id, instance)
-                                    for instance in node_instances)
-        super(InMemoryStorage, self).init(name, plan, nodes, node_instances,
-                                          blueprint_path, provider_context)
-
-    def load(self, name):
-        raise NotImplementedError('load is not implemented by memory storage')
-
-    def _load_instance(self, node_instance_id):
-        return self._node_instances.get(node_instance_id)
-
-    def _store_instance(self, node_instance):
-        pass
-
-    def get_node_instances(self, node_id=None):
-        instances = self._node_instances.values()
-        if node_id:
-            instances = [i for i in instances if i.node_id == node_id]
-        return copy.deepcopy(instances)
-
-    def _instance_ids(self):
-        return self._node_instances.keys()
-
-    def get_workdir(self):
-        raise NotImplementedError('get_workdir is not implemented by memory '
-                                  'storage')
-
-
-class FileStorage(_Storage):
-
-    def __init__(self, storage_dir='/tmp/cloudify-workflows'):
-        super(FileStorage, self).__init__()
-        self._root_storage_dir = os.path.join(storage_dir)
-        self._storage_dir = None
-        self._workdir = None
-        self._instances_dir = None
-        self._data_path = None
-        self._payload_path = None
-        self._blueprint_path = None
-
-    def init(self, name, plan, nodes, node_instances, blueprint_path,
-             provider_context):
-        storage_dir = os.path.join(self._root_storage_dir, name)
-        workdir = os.path.join(storage_dir, 'work')
-        instances_dir = os.path.join(storage_dir, 'node-instances')
-        data_path = os.path.join(storage_dir, 'data')
-        payload_path = os.path.join(storage_dir, 'payload')
-        os.makedirs(storage_dir)
-        os.mkdir(instances_dir)
-        os.mkdir(workdir)
-        with open(payload_path, 'w') as f:
-            f.write(json.dumps({}))
-
-        blueprint_filename = os.path.basename(os.path.abspath(blueprint_path))
-        with open(data_path, 'w') as f:
-            f.write(json.dumps({
-                'plan': plan,
-                'blueprint_filename': blueprint_filename,
-                'nodes': nodes,
-                'provider_context': provider_context or {}
-            }))
-        resources_root = os.path.dirname(os.path.abspath(blueprint_path))
-        self.resources_root = os.path.join(storage_dir, 'resources')
-
-        def ignore(src, names):
-            return names if os.path.abspath(self.resources_root) == src \
-                else set()
-        shutil.copytree(resources_root, self.resources_root, ignore=ignore)
-        self._instances_dir = instances_dir
-        for instance in node_instances:
-            self._store_instance(instance, lock=False)
-        self.load(name)
-
-    def load(self, name):
-        self.name = name
-        self._storage_dir = os.path.join(self._root_storage_dir, name)
-        self._workdir = os.path.join(self._storage_dir, 'workdir')
-        self._instances_dir = os.path.join(self._storage_dir, 'node-instances')
-        self._payload_path = os.path.join(self._storage_dir, 'payload')
-        self._data_path = os.path.join(self._storage_dir, 'data')
-        with open(self._data_path) as f:
-            data = json.loads(f.read())
-        self.plan = data['plan']
-        self.resources_root = os.path.join(self._storage_dir, 'resources')
-        self._blueprint_path = os.path.join(self.resources_root,
-                                            data['blueprint_filename'])
-        self._provider_context = data.get('provider_context', {})
-        nodes = [Node(node) for node in data['nodes']]
-        self._init_locks_and_nodes(nodes)
-
-    @contextmanager
-    def payload(self):
-        with open(self._payload_path, 'r') as f:
-            payload = json.load(f)
-            yield payload
-        with open(self._payload_path, 'w') as f:
-            json.dump(payload, f, indent=2)
-            f.write(os.linesep)
-
-    def get_blueprint_path(self):
-        return self._blueprint_path
-
-    def get_node_instance(self, node_instance_id):
-        return self._get_node_instance(node_instance_id)
-
-    def _load_instance(self, node_instance_id):
-        with self._lock(node_instance_id):
-            with open(self._instance_path(node_instance_id)) as f:
-                return NodeInstance(json.loads(f.read()))
-
-    def _store_instance(self, node_instance, lock=True):
-        instance_lock = None
-        if lock:
-            instance_lock = self._lock(node_instance.id)
-            instance_lock.acquire()
-        try:
-            with open(self._instance_path(node_instance.id), 'w') as f:
-                f.write(json.dumps(node_instance))
-        finally:
-            if lock and instance_lock:
-                instance_lock.release()
-
-    def _instance_path(self, node_instance_id):
-        return os.path.join(self._instances_dir, node_instance_id)
-
-    def get_node_instances(self, node_id=None):
-        instances = [self._get_node_instance(instance_id)
-                     for instance_id in self._instance_ids()]
-        if node_id:
-            instances = [i for i in instances if i.node_id == node_id]
-        return instances
-
-    def _instance_ids(self):
-        return os.listdir(self._instances_dir)
-
-    def get_workdir(self):
-        return self._workdir
-
-
-class StorageConflictError(Exception):
-    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c8a5c91d/aria/from_cloudify/workflows/tasks.py
----------------------------------------------------------------------
diff --git a/aria/from_cloudify/workflows/tasks.py b/aria/from_cloudify/workflows/tasks.py
deleted file mode 100644
index bf676f1..0000000
--- a/aria/from_cloudify/workflows/tasks.py
+++ /dev/null
@@ -1,767 +0,0 @@
-########
-# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#        http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#    * See the License for the specific language governing permissions and
-#    * limitations under the License.
-
-
-import sys
-import time
-import uuid
-import Queue
-
-from cloudify import utils
-from cloudify import exceptions
-from cloudify.workflows import api
-
-INFINITE_TOTAL_RETRIES = -1
-DEFAULT_TOTAL_RETRIES = INFINITE_TOTAL_RETRIES
-DEFAULT_RETRY_INTERVAL = 30
-DEFAULT_SUBGRAPH_TOTAL_RETRIES = 0
-
-DEFAULT_SEND_TASK_EVENTS = True
-
-TASK_PENDING = 'pending'
-TASK_SENDING = 'sending'
-TASK_SENT = 'sent'
-TASK_STARTED = 'started'
-TASK_RESCHEDULED = 'rescheduled'
-TASK_SUCCEEDED = 'succeeded'
-TASK_FAILED = 'failed'
-
-TERMINATED_STATES = [TASK_RESCHEDULED, TASK_SUCCEEDED, TASK_FAILED]
-
-DISPATCH_TASK = 'cloudify.dispatch.dispatch'
-
-INSPECT_TIMEOUT = 30
-
-
-def retry_failure_handler(task):
-    """Basic on_success/on_failure handler that always returns retry"""
-    return HandlerResult.retry()
-
-
-class WorkflowTask(object):
-    """A base class for workflow tasks"""
-
-    def __init__(self,
-                 workflow_context,
-                 task_id=None,
-                 info=None,
-                 on_success=None,
-                 on_failure=None,
-                 total_retries=DEFAULT_TOTAL_RETRIES,
-                 retry_interval=DEFAULT_RETRY_INTERVAL,
-                 send_task_events=DEFAULT_SEND_TASK_EVENTS):
-        """
-        :param task_id: The id of this task (generated if none is provided)
-        :param info: A short description of this task (for logging)
-        :param on_success: A handler called when the task's execution
-                           terminates successfully.
-                           Expected to return one of
-                           [HandlerResult.retry(), HandlerResult.cont()]
-                           to indicate whether this task should be re-executed.
-        :param on_failure: A handler called when the task's execution
-                           fails.
-                           Expected to return one of
-                           [HandlerResult.retry(), HandlerResult.ignore(),
-                            HandlerResult.fail()]
-                           to indicate whether this task should be re-executed,
-                           cause the engine to terminate workflow execution
-                           immediately or simply ignore this task failure and
-                           move on.
-        :param total_retries: Maximum retry attempt for this task, in case
-                              the handlers return a retry attempt.
-        :param retry_interval: Number of seconds to wait between retries
-        :param workflow_context: the CloudifyWorkflowContext instance
-        """
-        self.id = task_id or str(uuid.uuid4())
-        self._state = TASK_PENDING
-        self.async_result = None
-        self.on_success = on_success
-        self.on_failure = on_failure
-        self.info = info
-        self.error = None
-        self.total_retries = total_retries
-        self.retry_interval = retry_interval
-        self.terminated = Queue.Queue(maxsize=1)
-        self.is_terminated = False
-        self.workflow_context = workflow_context
-        self.send_task_events = send_task_events
-        self.containing_subgraph = None
-
-        self.current_retries = 0
-        # timestamp for which the task should not be executed
-        # by the task graph before reached, overridden by the task
-        # graph during retries
-        self.execute_after = time.time()
-
-    def dump(self):
-        return {
-            'id': self.id,
-            'state': self.get_state(),
-            'info': self.info,
-            'error': self.error,
-            'current_retries': self.current_retries,
-            'cloudify_context': self.cloudify_context
-        }
-
-    def is_remote(self):
-        """
-        :return: Is this a remote task
-        """
-        return not self.is_local()
-
-    def is_local(self):
-        """
-        :return: Is this a local task
-        """
-        raise NotImplementedError('Implemented by subclasses')
-
-    def is_nop(self):
-        """
-        :return: Is this a NOP task
-        """
-        return False
-
-    def get_state(self):
-        """
-        Get the task state
-
-        :return: The task state [pending, sending, sent, started,
-                                 rescheduled, succeeded, failed]
-        """
-        return self._state
-
-    def set_state(self, state):
-        """
-        Set the task state
-
-        :param state: The state to set [pending, sending, sent, started,
-                                        rescheduled, succeeded, failed]
-        """
-        if state not in [TASK_PENDING, TASK_SENDING, TASK_SENT, TASK_STARTED,
-                         TASK_RESCHEDULED, TASK_SUCCEEDED, TASK_FAILED]:
-            raise RuntimeError('Illegal state set on task: {0} '
-                               '[task={1}]'.format(state, str(self)))
-        self._state = state
-        if state in TERMINATED_STATES:
-            self.is_terminated = True
-            self.terminated.put_nowait(True)
-
-    def wait_for_terminated(self, timeout=None):
-        if self.is_terminated:
-            return
-        self.terminated.get(timeout=timeout)
-
-    def handle_task_terminated(self):
-        if self.get_state() in (TASK_FAILED, TASK_RESCHEDULED):
-            handler_result = self._handle_task_not_succeeded()
-        else:
-            handler_result = self._handle_task_succeeded()
-
-        if handler_result.action == HandlerResult.HANDLER_RETRY:
-            if any([self.total_retries == INFINITE_TOTAL_RETRIES,
-                    self.current_retries < self.total_retries,
-                    handler_result.ignore_total_retries]):
-                if handler_result.retry_after is None:
-                    handler_result.retry_after = self.retry_interval
-                if handler_result.retried_task is None:
-                    new_task = self.duplicate_for_retry(
-                        time.time() + handler_result.retry_after)
-                    handler_result.retried_task = new_task
-            else:
-                handler_result.action = HandlerResult.HANDLER_FAIL
-
-        if self.containing_subgraph:
-            subgraph = self.containing_subgraph
-            retried_task = None
-            if handler_result.action == HandlerResult.HANDLER_FAIL:
-                handler_result.action = HandlerResult.HANDLER_IGNORE
-                # It is possible that two concurrent tasks failed.
-                # we will only consider the first one handled
-                if not subgraph.failed_task:
-                    subgraph.failed_task = self
-                    subgraph.set_state(TASK_FAILED)
-            elif handler_result.action == HandlerResult.HANDLER_RETRY:
-                retried_task = handler_result.retried_task
-            subgraph.task_terminated(task=self, new_task=retried_task)
-
-        return handler_result
-
-    def _handle_task_succeeded(self):
-        """Call handler for task success"""
-        if self.on_success:
-            return self.on_success(self)
-        else:
-            return HandlerResult.cont()
-
-    def _handle_task_not_succeeded(self):
-
-        """
-        Call handler for task which hasn't ended in 'succeeded' state
-        (i.e. has either failed or been rescheduled)
-        """
-
-        try:
-            exception = self.async_result.result
-        except Exception as e:
-            exception = exceptions.NonRecoverableError(
-                'Could not de-serialize '
-                'exception of task {0} --> {1}: {2}'
-                .format(self.name,
-                        type(e).__name__,
-                        str(e)))
-
-        if isinstance(exception, exceptions.OperationRetry):
-            # operation explicitly requested a retry, so we ignore
-            # the handler set on the task.
-            handler_result = HandlerResult.retry()
-        elif self.on_failure:
-            handler_result = self.on_failure(self)
-        else:
-            handler_result = HandlerResult.retry()
-
-        if handler_result.action == HandlerResult.HANDLER_RETRY:
-            if isinstance(exception, exceptions.NonRecoverableError):
-                handler_result = HandlerResult.fail()
-            elif isinstance(exception, exceptions.RecoverableError):
-                handler_result.retry_after = exception.retry_after
-
-        if not self.is_subgraph:
-            causes = []
-            if isinstance(exception, (exceptions.RecoverableError,
-                                      exceptions.NonRecoverableError)):
-                causes = exception.causes or []
-            if isinstance(self, LocalWorkflowTask):
-                tb = self.async_result._holder.error[1]
-                causes.append(utils.exception_to_error_cause(exception, tb))
-            self.workflow_context.internal.send_task_event(
-                state=self.get_state(),
-                task=self,
-                event={'exception': exception, 'causes': causes})
-
-        return handler_result
-
-    def __str__(self):
-        suffix = self.info if self.info is not None else ''
-        return '{0}({1})'.format(self.name, suffix)
-
-    def duplicate_for_retry(self, execute_after):
-        """
-        :return: A new instance of this task with a new task id
-        """
-        dup = self._duplicate()
-        dup.execute_after = execute_after
-        dup.current_retries = self.current_retries + 1
-        if dup.cloudify_context and 'operation' in dup.cloudify_context:
-            op_ctx = dup.cloudify_context['operation']
-            op_ctx['retry_number'] = dup.current_retries
-        return dup
-
-    def _duplicate(self):
-        raise NotImplementedError('Implemented by subclasses')
-
-    @property
-    def cloudify_context(self):
-        raise NotImplementedError('Implemented by subclasses')
-
-    @property
-    def name(self):
-        """
-        :return: The task name
-        """
-
-        raise NotImplementedError('Implemented by subclasses')
-
-    @property
-    def is_subgraph(self):
-        return False
-
-
-class RemoteWorkflowTask(WorkflowTask):
-    """A WorkflowTask wrapping a celery based task"""
-
-    # cache for registered tasks queries to celery workers
-    cache = {}
-
-    def __init__(self,
-                 kwargs,
-                 cloudify_context,
-                 workflow_context,
-                 task_queue=None,
-                 task_target=None,
-                 task_id=None,
-                 info=None,
-                 on_success=None,
-                 on_failure=retry_failure_handler,
-                 total_retries=DEFAULT_TOTAL_RETRIES,
-                 retry_interval=DEFAULT_RETRY_INTERVAL,
-                 send_task_events=DEFAULT_SEND_TASK_EVENTS):
-        """
-        :param kwargs: The keyword argument this task will be invoked with
-        :param cloudify_context: the cloudify context dict
-        :param task_queue: the cloudify context dict
-        :param task_target: the cloudify context dict
-        :param task_id: The id of this task (generated if none is provided)
-        :param info: A short description of this task (for logging)
-        :param on_success: A handler called when the task's execution
-                           terminates successfully.
-                           Expected to return one of
-                           [HandlerResult.retry(), HandlerResult.cont()]
-                           to indicate whether this task should be re-executed.
-        :param on_failure: A handler called when the task's execution
-                           fails.
-                           Expected to return one of
-                           [HandlerResult.retry(), HandlerResult.ignore(),
-                            HandlerResult.fail()]
-                           to indicate whether this task should be re-executed,
-                           cause the engine to terminate workflow execution
-                           immediately or simply ignore this task failure and
-                           move on.
-        :param total_retries: Maximum retry attempt for this task, in case
-                              the handlers return a retry attempt.
-        :param retry_interval: Number of seconds to wait between retries
-        :param workflow_context: the CloudifyWorkflowContext instance
-        """
-        super(RemoteWorkflowTask, self).__init__(
-            workflow_context,
-            task_id,
-            info=info,
-            on_success=on_success,
-            on_failure=on_failure,
-            total_retries=total_retries,
-            retry_interval=retry_interval,
-            send_task_events=send_task_events)
-        self._task_target = task_target
-        self._task_queue = task_queue
-        self._kwargs = kwargs
-        self._cloudify_context = cloudify_context
-
-    def apply_async(self):
-        """
-        Call the underlying celery tasks apply_async. Verify the worker
-        is alive and send an event before doing so.
-
-        :return: a RemoteWorkflowTaskResult instance wrapping the
-                 celery async result
-        """
-        try:
-            task, self._task_queue, self._task_target = \
-                self.workflow_context.internal.handler.get_task(
-                    self, queue=self._task_queue, target=self._task_target)
-            self._verify_worker_alive()
-            self.workflow_context.internal.send_task_event(TASK_SENDING, self)
-            self.set_state(TASK_SENT)
-            async_result = task.apply_async(task_id=self.id)
-            self.async_result = RemoteWorkflowTaskResult(self, async_result)
-        except (exceptions.NonRecoverableError,
-                exceptions.RecoverableError) as e:
-            self.set_state(TASK_FAILED)
-            self.async_result = RemoteWorkflowErrorTaskResult(self, e)
-        return self.async_result
-
-    def is_local(self):
-        return False
-
-    def _duplicate(self):
-        dup = RemoteWorkflowTask(kwargs=self._kwargs,
-                                 task_queue=self.queue,
-                                 task_target=self.target,
-                                 cloudify_context=self.cloudify_context,
-                                 workflow_context=self.workflow_context,
-                                 task_id=None,  # we want a new task id
-                                 info=self.info,
-                                 on_success=self.on_success,
-                                 on_failure=self.on_failure,
-                                 total_retries=self.total_retries,
-                                 retry_interval=self.retry_interval,
-                                 send_task_events=self.send_task_events)
-        dup.cloudify_context['task_id'] = dup.id
-        return dup
-
-    @property
-    def name(self):
-        """The task name"""
-        return self.cloudify_context['task_name']
-
-    @property
-    def cloudify_context(self):
-        return self._cloudify_context
-
-    @property
-    def target(self):
-        """The task target (worker name)"""
-        return self._task_target
-
-    @property
-    def queue(self):
-        """The task queue"""
-        return self._task_queue
-
-    @property
-    def kwargs(self):
-        """kwargs to pass when invoking the task"""
-        return self._kwargs
-
-    def _verify_worker_alive(self):
-        verify_worker_alive(self.name,
-                            self.target,
-                            self._get_registered)
-
-    def _get_registered(self):
-        # import here because this only applies in remote execution
-        # environments
-        from cloudify_agent.app import app
-
-        worker_name = 'celery@{0}'.format(self.target)
-        inspect = app.control.inspect(destination=[worker_name],
-                                      timeout=INSPECT_TIMEOUT)
-        registered = inspect.registered()
-        if registered is None or worker_name not in registered:
-            return None
-        return set(registered[worker_name])
-
-
-class LocalWorkflowTask(WorkflowTask):
-    """A WorkflowTask wrapping a local callable"""
-
-    def __init__(self,
-                 local_task,
-                 workflow_context,
-                 node=None,
-                 info=None,
-                 on_success=None,
-                 on_failure=retry_failure_handler,
-                 total_retries=DEFAULT_TOTAL_RETRIES,
-                 retry_interval=DEFAULT_RETRY_INTERVAL,
-                 send_task_events=DEFAULT_SEND_TASK_EVENTS,
-                 kwargs=None,
-                 task_id=None,
-                 name=None):
-        """
-        :param local_task: A callable
-        :param workflow_context: the CloudifyWorkflowContext instance
-        :param node: The CloudifyWorkflowNode instance (if in node context)
-        :param info: A short description of this task (for logging)
-        :param on_success: A handler called when the task's execution
-                           terminates successfully.
-                           Expected to return one of
-                           [HandlerResult.retry(), HandlerResult.cont()]
-                           to indicate whether this task should be re-executed.
-        :param on_failure: A handler called when the task's execution
-                           fails.
-                           Expected to return one of
-                           [HandlerResult.retry(), HandlerResult.ignore(),
-                            HandlerResult.fail()]
-                           to indicate whether this task should be re-executed,
-                           cause the engine to terminate workflow execution
-                           immediately or simply ignore this task failure and
-                           move on.
-        :param total_retries: Maximum retry attempt for this task, in case
-                              the handlers return a retry attempt.
-        :param retry_interval: Number of seconds to wait between retries
-        :param kwargs: Local task keyword arguments
-        :param name: optional parameter (default: local_task.__name__)
-        """
-        super(LocalWorkflowTask, self).__init__(
-            info=info,
-            on_success=on_success,
-            on_failure=on_failure,
-            total_retries=total_retries,
-            retry_interval=retry_interval,
-            task_id=task_id,
-            workflow_context=workflow_context,
-            send_task_events=send_task_events)
-        self.local_task = local_task
-        self.node = node
-        self.kwargs = kwargs or {}
-        self._name = name or local_task.__name__
-
-    def dump(self):
-        super_dump = super(LocalWorkflowTask, self).dump()
-        super_dump.update({
-            'name': self._name
-        })
-        return super_dump
-
-    def apply_async(self):
-        """
-        Execute the task in the local task thread pool
-        :return: A wrapper for the task result
-        """
-
-        def local_task_wrapper():
-            try:
-                self.workflow_context.internal.send_task_event(TASK_STARTED,
-                                                               self)
-                result = self.local_task(**self.kwargs)
-                self.workflow_context.internal.send_task_event(
-                    TASK_SUCCEEDED, self, event={'result': str(result)})
-                self.async_result._holder.result = result
-                self.set_state(TASK_SUCCEEDED)
-            except BaseException as e:
-                new_task_state = TASK_RESCHEDULED if isinstance(
-                    e, exceptions.OperationRetry) else TASK_FAILED
-                exc_type, exception, tb = sys.exc_info()
-                self.async_result._holder.error = (exception, tb)
-                self.set_state(new_task_state)
-
-        self.async_result = LocalWorkflowTaskResult(self)
-
-        self.workflow_context.internal.send_task_event(TASK_SENDING, self)
-        self.set_state(TASK_SENT)
-        self.workflow_context.internal.add_local_task(local_task_wrapper)
-
-        return self.async_result
-
-    def is_local(self):
-        return True
-
-    def _duplicate(self):
-        dup = LocalWorkflowTask(local_task=self.local_task,
-                                workflow_context=self.workflow_context,
-                                node=self.node,
-                                info=self.info,
-                                on_success=self.on_success,
-                                on_failure=self.on_failure,
-                                total_retries=self.total_retries,
-                                retry_interval=self.retry_interval,
-                                send_task_events=self.send_task_events,
-                                kwargs=self.kwargs,
-                                name=self.name)
-        return dup
-
-    @property
-    def name(self):
-        """The task name"""
-        return self._name
-
-    @property
-    def cloudify_context(self):
-        return self.kwargs.get('__cloudify_context')
-
-
-# NOP tasks class
-class NOPLocalWorkflowTask(LocalWorkflowTask):
-
-    def __init__(self, workflow_context):
-        super(NOPLocalWorkflowTask, self).__init__(lambda: None,
-                                                   workflow_context)
-
-    @property
-    def name(self):
-        """The task name"""
-        return 'NOP'
-
-    def apply_async(self):
-        self.set_state(TASK_SUCCEEDED)
-        return LocalWorkflowTaskResult(self)
-
-    def is_nop(self):
-        return True
-
-
-class WorkflowTaskResult(object):
-    """A base wrapper for workflow task results"""
-
-    def __init__(self, task):
-        self.task = task
-
-    def _process(self, retry_on_failure):
-        if self.task.workflow_context.internal.graph_mode:
-            return self._get()
-        task_graph = self.task.workflow_context.internal.task_graph
-        while True:
-            self._wait_for_task_terminated()
-            handler_result = self.task.handle_task_terminated()
-            task_graph.remove_task(self.task)
-            try:
-                result = self._get()
-                if handler_result.action != HandlerResult.HANDLER_RETRY:
-                    return result
-            except:
-                if (not retry_on_failure or
-                        handler_result.action == HandlerResult.HANDLER_FAIL):
-                    raise
-            self._sleep(handler_result.retry_after)
-            self.task = handler_result.retried_task
-            task_graph.add_task(self.task)
-            self._check_execution_cancelled()
-            self.task.apply_async()
-            self._refresh_state()
-
-    @staticmethod
-    def _check_execution_cancelled():
-        if api.has_cancel_request():
-            raise api.ExecutionCancelled()
-
-    def _wait_for_task_terminated(self):
-        while True:
-            self._check_execution_cancelled()
-            try:
-                self.task.wait_for_terminated(timeout=1)
-                break
-            except Queue.Empty:
-                continue
-
-    def _sleep(self, seconds):
-        while seconds > 0:
-            self._check_execution_cancelled()
-            sleep_time = 1 if seconds > 1 else seconds
-            time.sleep(sleep_time)
-            seconds -= sleep_time
-
-    def get(self, retry_on_failure=True):
-        """
-        Get the task result.
-        Will block until the task execution ends.
-
-        :return: The task result
-        """
-        return self._process(retry_on_failure)
-
-    def _get(self):
-        raise NotImplementedError('Implemented by subclasses')
-
-    def _refresh_state(self):
-        raise NotImplementedError('Implemented by subclasses')
-
-
-class RemoteWorkflowErrorTaskResult(WorkflowTaskResult):
-
-    def __init__(self, task, exception):
-        super(RemoteWorkflowErrorTaskResult, self).__init__(task)
-        self.exception = exception
-
-    def _get(self):
-        raise self.exception
-
-    @property
-    def result(self):
-        return self.exception
-
-
-class RemoteWorkflowTaskResult(WorkflowTaskResult):
-    """A wrapper for celery's AsyncResult"""
-
-    def __init__(self, task, async_result):
-        super(RemoteWorkflowTaskResult, self).__init__(task)
-        self.async_result = async_result
-
-    def _get(self):
-        return self.async_result.get()
-
-    def _refresh_state(self):
-        self.async_result = self.task.async_result.async_result
-
-    @property
-    def result(self):
-        return self.async_result.result
-
-
-class LocalWorkflowTaskResult(WorkflowTaskResult):
-    """A wrapper for local workflow task results"""
-
-    class ResultHolder(object):
-
-        def __init__(self, result=None, error=None):
-            self.result = result
-            self.error = error
-
-    def __init__(self, task):
-        """
-        :param task: The LocalWorkflowTask instance
-        """
-        super(LocalWorkflowTaskResult, self).__init__(task)
-        self._holder = self.ResultHolder()
-
-    def _get(self):
-        if self._holder.error is not None:
-            exception, traceback = self._holder.error
-            raise exception, None, traceback
-        return self._holder.result
-
-    def _refresh_state(self):
-        self._holder = self.task.async_result._holder
-
-    @property
-    def result(self):
-        if self._holder.error:
-            return self._holder.error[0]
-        else:
-            return self._holder.result
-
-
-class StubAsyncResult(object):
-    """Stub async result that always returns None"""
-    result = None
-
-
-class HandlerResult(object):
-
-    HANDLER_RETRY = 'handler_retry'
-    HANDLER_FAIL = 'handler_fail'
-    HANDLER_IGNORE = 'handler_ignore'
-    HANDLER_CONTINUE = 'handler_continue'
-
-    def __init__(self,
-                 action,
-                 ignore_total_retries=False,
-                 retry_after=None):
-        self.action = action
-        self.ignore_total_retries = ignore_total_retries
-        self.retry_after = retry_after
-
-        # this field is filled by handle_terminated_task() below after
-        # duplicating the task and updating the relevant task fields
-        # or by a subgraph on_XXX handler
-        self.retried_task = None
-
-    @classmethod
-    def retry(cls, ignore_total_retries=False, retry_after=None):
-        return HandlerResult(cls.HANDLER_RETRY,
-                             ignore_total_retries=ignore_total_retries,
-                             retry_after=retry_after)
-
-    @classmethod
-    def fail(cls):
-        return HandlerResult(cls.HANDLER_FAIL)
-
-    @classmethod
-    def cont(cls):
-        return HandlerResult(cls.HANDLER_CONTINUE)
-
-    @classmethod
-    def ignore(cls):
-        return HandlerResult(cls.HANDLER_IGNORE)
-
-
-def verify_worker_alive(name, target, get_registered):
-
-    cache = RemoteWorkflowTask.cache
-    registered = cache.get(target)
-    if not registered:
-        registered = get_registered()
-        cache[target] = registered
-
-    if registered is None:
-        raise exceptions.RecoverableError(
-            'Timed out querying worker celery@{0} for its registered '
-            'tasks. [timeout={1} seconds]'.format(target, INSPECT_TIMEOUT))
-
-    if DISPATCH_TASK not in registered:
-        raise exceptions.NonRecoverableError(
-            'Missing {0} task in worker {1} \n'
-            'Registered tasks are: {2}. (This probably means the agent '
-            'configuration is invalid) [{3}]'.format(
-                DISPATCH_TASK, target, registered, name))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c8a5c91d/aria/from_cloudify/workflows/tasks_graph.py
----------------------------------------------------------------------
diff --git a/aria/from_cloudify/workflows/tasks_graph.py b/aria/from_cloudify/workflows/tasks_graph.py
deleted file mode 100644
index 31e5635..0000000
--- a/aria/from_cloudify/workflows/tasks_graph.py
+++ /dev/null
@@ -1,372 +0,0 @@
-########
-# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#        http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#    * See the License for the specific language governing permissions and
-#    * limitations under the License.
-
-
-import os
-import json
-import time
-
-import networkx as nx
-
-from cloudify.workflows import api
-from cloudify.workflows import tasks
-
-
-class TaskDependencyGraph(object):
-    """
-    A task graph builder
-
-    :param workflow_context: A WorkflowContext instance (used for logging)
-    """
-
-    def __init__(self, workflow_context,
-                 default_subgraph_task_config=None):
-        self.ctx = workflow_context
-        self.graph = nx.DiGraph()
-        default_subgraph_task_config = default_subgraph_task_config or {}
-        self._default_subgraph_task_config = default_subgraph_task_config
-
-    def add_task(self, task):
-        """Add a WorkflowTask to this graph
-
-        :param task: The task
-        """
-        self.graph.add_node(task.id, task=task)
-
-    def get_task(self, task_id):
-        """Get a task instance that was inserted to this graph by its id
-
-        :param task_id: the task id
-        :return: a WorkflowTask instance for the requested task if found.
-                 None, otherwise.
-        """
-        data = self.graph.node.get(task_id)
-        return data['task'] if data is not None else None
-
-    def remove_task(self, task):
-        """Remove the provided task from the graph
-
-        :param task: The task
-        """
-        if task.is_subgraph:
-            for subgraph_task in task.tasks.values():
-                self.remove_task(subgraph_task)
-        if task.id in self.graph:
-            self.graph.remove_node(task.id)
-
-    # src depends on dst
-    def add_dependency(self, src_task, dst_task):
-        """
-        Add a dependency between tasks.
-        The source task will only be executed after the target task terminates.
-        A task may depend on several tasks, in which case it will only be
-        executed after all its 'destination' tasks terminate
-
-        :param src_task: The source task
-        :param dst_task: The target task
-        """
-        if not self.graph.has_node(src_task.id):
-            raise RuntimeError('source task {0} is not in graph (task id: '
-                               '{1})'.format(src_task, src_task.id))
-        if not self.graph.has_node(dst_task.id):
-            raise RuntimeError('destination task {0} is not in graph (task '
-                               'id: {1})'.format(dst_task, dst_task.id))
-        self.graph.add_edge(src_task.id, dst_task.id)
-
-    def sequence(self):
-        """
-        :return: a new TaskSequence for this graph
-        """
-        return TaskSequence(self)
-
-    def subgraph(self, name):
-        task = SubgraphTask(name, self, **self._default_subgraph_task_config)
-        self.add_task(task)
-        return task
-
-    def execute(self):
-        """
-        Start executing the graph based on tasks and dependencies between
-        them.
-        Calling this method will block until one of the following occurs:
-            1. all tasks terminated
-            2. a task failed
-            3. an unhandled exception is raised
-            4. the execution is cancelled
-
-        Note: This method will raise an api.ExecutionCancelled error if the
-        execution has been cancelled. When catching errors raised from this
-        method, make sure to re-raise the error if it's
-        api.ExecutionsCancelled in order to allow the execution to be set in
-        cancelled mode properly.
-
-        Also note that for the time being, if such a cancelling event
-        occurs, the method might return even while there's some operations
-        still being executed.
-        """
-
-        while True:
-
-            if self._is_execution_cancelled():
-                raise api.ExecutionCancelled()
-
-            self._check_dump_request()
-
-            # handle all terminated tasks
-            # it is important this happens before handling
-            # executable tasks so we get to make tasks executable
-            # and then execute them in this iteration (otherwise, it would
-            # be the next one)
-            for task in self._terminated_tasks():
-                self._handle_terminated_task(task)
-
-            # handle all executable tasks
-            for task in self._executable_tasks():
-                self._handle_executable_task(task)
-
-            # no more tasks to process, time to move on
-            if len(self.graph.node) == 0:
-                return
-            # sleep some and do it all over again
-            else:
-                time.sleep(0.1)
-
-    @staticmethod
-    def _is_execution_cancelled():
-        return api.has_cancel_request()
-
-    def _executable_tasks(self):
-        """
-        A task is executable if it is in pending state
-        , it has no dependencies at the moment (i.e. all of its dependencies
-        already terminated) and its execution timestamp is smaller then the
-        current timestamp
-
-        :return: An iterator for executable tasks
-        """
-        now = time.time()
-        return (task for task in self.tasks_iter()
-                if task.get_state() == tasks.TASK_PENDING and
-                task.execute_after <= now and
-                not (task.containing_subgraph and
-                     task.containing_subgraph.get_state() ==
-                     tasks.TASK_FAILED) and
-                not self._task_has_dependencies(task))
-
-    def _terminated_tasks(self):
-        """
-        A task is terminated if it is in 'succeeded' or 'failed' state
-
-        :return: An iterator for terminated tasks
-        """
-        return (task for task in self.tasks_iter()
-                if task.get_state() in tasks.TERMINATED_STATES)
-
-    def _task_has_dependencies(self, task):
-        """
-        :param task: The task
-        :return: Does this task have any dependencies
-        """
-        return (len(self.graph.succ.get(task.id, {})) > 0 or
-                (task.containing_subgraph and self._task_has_dependencies(
-                    task.containing_subgraph)))
-
-    def tasks_iter(self):
-        """
-        An iterator on tasks added to the graph
-        """
-        return (data['task'] for _, data in self.graph.nodes_iter(data=True))
-
-    def _handle_executable_task(self, task):
-        """Handle executable task"""
-        task.set_state(tasks.TASK_SENDING)
-        task.apply_async()
-
-    def _handle_terminated_task(self, task):
-        """Handle terminated task"""
-
-        handler_result = task.handle_task_terminated()
-        if handler_result.action == tasks.HandlerResult.HANDLER_FAIL:
-            if isinstance(task, SubgraphTask) and task.failed_task:
-                task = task.failed_task
-            message = "Workflow failed: Task failed '{0}'".format(task.name)
-            if task.error:
-                message = '{0} -> {1}'.format(message, task.error)
-            raise RuntimeError(message)
-
-        dependents = self.graph.predecessors(task.id)
-        removed_edges = [(dependent, task.id)
-                         for dependent in dependents]
-        self.graph.remove_edges_from(removed_edges)
-        self.graph.remove_node(task.id)
-        if handler_result.action == tasks.HandlerResult.HANDLER_RETRY:
-            new_task = handler_result.retried_task
-            self.add_task(new_task)
-            added_edges = [(dependent, new_task.id)
-                           for dependent in dependents]
-            self.graph.add_edges_from(added_edges)
-
-    def _check_dump_request(self):
-        task_dump = os.environ.get('WORKFLOW_TASK_DUMP')
-        if not (task_dump and os.path.exists(task_dump)):
-            return
-        os.remove(task_dump)
-        task_dump_path = '{0}.{1}'.format(task_dump, time.time())
-        with open(task_dump_path, 'w') as f:
-            f.write(json.dumps({
-                'tasks': [task.dump() for task in self.tasks_iter()],
-                'edges': [[s, t] for s, t in self.graph.edges_iter()]}))
-
-
-class forkjoin(object):
-    """
-    A simple wrapper for tasks. Used in conjunction with TaskSequence.
-    Defined to make the code easier to read (instead of passing a list)
-    see ``TaskSequence.add`` for more details
-    """
-
-    def __init__(self, *tasks):
-        self.tasks = tasks
-
-
-class TaskSequence(object):
-    """
-    Helper class to add tasks in a sequential manner to a task dependency
-    graph
-
-    :param graph: The TaskDependencyGraph instance
-    """
-
-    def __init__(self, graph):
-        self.graph = graph
-        self.last_fork_join_tasks = None
-
-    def add(self, *tasks):
-        """
-        Add tasks to the sequence.
-
-        :param tasks: Each task might be:
-
-                      * A WorkflowTask instance, in which case, it will be
-                        added to the graph with a dependency between it and
-                        the task previously inserted into the sequence
-                      * A forkjoin of tasks, in which case it will be treated
-                        as a "fork-join" task in the sequence, i.e. all the
-                        fork-join tasks will depend on the last task in the
-                        sequence (could be fork join) and the next added task
-                        will depend on all tasks in this fork-join task
-        """
-        for fork_join_tasks in tasks:
-            if isinstance(fork_join_tasks, forkjoin):
-                fork_join_tasks = fork_join_tasks.tasks
-            else:
-                fork_join_tasks = [fork_join_tasks]
-            for task in fork_join_tasks:
-                self.graph.add_task(task)
-                if self.last_fork_join_tasks is not None:
-                    for last_fork_join_task in self.last_fork_join_tasks:
-                        self.graph.add_dependency(task, last_fork_join_task)
-            if fork_join_tasks:
-                self.last_fork_join_tasks = fork_join_tasks
-
-
-class SubgraphTask(tasks.WorkflowTask):
-
-    def __init__(self,
-                 name,
-                 graph,
-                 task_id=None,
-                 on_success=None,
-                 on_failure=None,
-                 total_retries=tasks.DEFAULT_SUBGRAPH_TOTAL_RETRIES,
-                 retry_interval=tasks.DEFAULT_RETRY_INTERVAL,
-                 send_task_events=tasks.DEFAULT_SEND_TASK_EVENTS):
-        super(SubgraphTask, self).__init__(
-            graph.ctx,
-            task_id,
-            info=name,
-            on_success=on_success,
-            on_failure=on_failure,
-            total_retries=total_retries,
-            retry_interval=retry_interval,
-            send_task_events=send_task_events)
-        self.graph = graph
-        self._name = name
-        self.tasks = {}
-        self.failed_task = None
-        if not self.on_failure:
-            self.on_failure = lambda tsk: tasks.HandlerResult.fail()
-        self.async_result = tasks.StubAsyncResult()
-
-    def _duplicate(self):
-        raise NotImplementedError('self.retried_task should be set explicitly'
-                                  ' in self.on_failure handler')
-
-    @property
-    def cloudify_context(self):
-        return {}
-
-    def is_local(self):
-        return True
-
-    @property
-    def name(self):
-        return self._name
-
-    @property
-    def is_subgraph(self):
-        return True
-
-    def sequence(self):
-        return TaskSequence(self)
-
-    def subgraph(self, name):
-        task = SubgraphTask(name, self.graph,
-                            **self.graph._default_subgraph_task_config)
-        self.add_task(task)
-        return task
-
-    def add_task(self, task):
-        self.graph.add_task(task)
-        self.tasks[task.id] = task
-        if task.containing_subgraph and task.containing_subgraph is not self:
-            raise RuntimeError('task {0}[{1}] cannot be contained in more '
-                               'than one subgraph. It is currently contained '
-                               'in {2} and it is now being added to {3}'
-                               .format(task,
-                                       task.id,
-                                       task.containing_subgraph.name,
-                                       self.name))
-        task.containing_subgraph = self
-
-    def remove_task(self, task):
-        self.graph.remove_task(task)
-
-    def add_dependency(self, src_task, dst_task):
-        self.graph.add_dependency(src_task, dst_task)
-
-    def apply_async(self):
-        if not self.tasks:
-            self.set_state(tasks.TASK_SUCCEEDED)
-        else:
-            self.set_state(tasks.TASK_STARTED)
-
-    def task_terminated(self, task, new_task=None):
-        del self.tasks[task.id]
-        if new_task:
-            self.tasks[new_task.id] = new_task
-            new_task.containing_subgraph = self
-        if not self.tasks and self.get_state() not in tasks.TERMINATED_STATES:
-            self.set_state(tasks.TASK_SUCCEEDED)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/c8a5c91d/aria/from_cloudify/workflows/workflow_api.py
----------------------------------------------------------------------
diff --git a/aria/from_cloudify/workflows/workflow_api.py b/aria/from_cloudify/workflows/workflow_api.py
deleted file mode 100644
index dc60f18..0000000
--- a/aria/from_cloudify/workflows/workflow_api.py
+++ /dev/null
@@ -1,47 +0,0 @@
-########
-# Copyright (c) 2014 GigaSpaces Technologies Ltd. All rights reserved
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#        http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-#    * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#    * See the License for the specific language governing permissions and
-#    * limitations under the License.
-
-
-EXECUTION_CANCELLED_RESULT = 'execution_cancelled'
-
-cancel_request = False
-
-
-def has_cancel_request():
-    """
-    Checks for requests to cancel the workflow execution.
-    This should be used to allow graceful termination of workflow executions.
-
-    If this method is not used and acted upon, a simple 'cancel'
-    request for the execution will have no effect - 'force-cancel' will have
-    to be used to abruptly terminate the execution instead.
-
-    Note: When this method returns True, the workflow should make the
-    appropriate cleanups and then it must raise an ExecutionCancelled error
-    if the execution indeed gets cancelled (i.e. if it's too late to cancel
-    there is no need to raise this exception and the workflow should end
-    normally).
-
-    :return: whether there was a request to cancel the workflow execution
-    """
-    return cancel_request
-
-
-class ExecutionCancelled(Exception):
-    """
-    This exception should be raised when a workflow has been cancelled,
-    once appropriate cleanups have taken place.
-    """
-    pass


Mime
View raw message