ariatosca-dev mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From mxm...@apache.org
Subject [3/5] incubator-ariatosca git commit: ARIA-48 Revamped ARIA CLI
Date Wed, 19 Apr 2017 14:11:38 GMT
http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/modeling/service_template.py
----------------------------------------------------------------------
diff --git a/aria/modeling/service_template.py b/aria/modeling/service_template.py
index 51fea2f..f1c2bcb 100644
--- a/aria/modeling/service_template.py
+++ b/aria/modeling/service_template.py
@@ -280,7 +280,7 @@ class ServiceTemplateBase(TemplateModelMixin):
             ('interface_types', formatting.as_raw(self.interface_types)),
             ('artifact_types', formatting.as_raw(self.artifact_types))))
 
-    def instantiate(self, container):
+    def instantiate(self, container, model_storage, inputs=None):  # pylint: disable=arguments-differ
         from . import models
         context = ConsumptionContext.get_thread_local()
         now = datetime.now()
@@ -288,13 +288,14 @@ class ServiceTemplateBase(TemplateModelMixin):
                                  updated_at=now,
                                  description=deepcopy_with_locators(self.description),
                                  service_template=self)
-        #service.name = '{0}_{1}'.format(self.name, service.id)
-
         context.modeling.instance = service
 
+        service.inputs = utils.create_inputs(inputs or {}, self.inputs)
+        # TODO: now that we have inputs, we should scan properties and inputs and evaluate functions
+
         for plugin_specification in self.plugin_specifications.itervalues():
             if plugin_specification.enabled:
-                if plugin_specification.resolve():
+                if plugin_specification.resolve(model_storage):
                     plugin = plugin_specification.plugin
                     service.plugins[plugin.name] = plugin
                 else:
@@ -316,15 +317,8 @@ class ServiceTemplateBase(TemplateModelMixin):
         if self.substitution_template is not None:
             service.substitution = self.substitution_template.instantiate(container)
 
-        utils.instantiate_dict(self, service.inputs, self.inputs)
         utils.instantiate_dict(self, service.outputs, self.outputs)
 
-        for name, the_input in context.modeling.inputs.iteritems():
-            if name not in service.inputs:
-                context.validation.report('input "{0}" is not supported'.format(name))
-            else:
-                service.inputs[name].value = the_input
-
         return service
 
     def validate(self):
@@ -448,8 +442,7 @@ class NodeTemplateBase(TemplateModelMixin):
     __tablename__ = 'node_template'
 
     __private_fields__ = ['type_fk',
-                          'service_template_fk',
-                          'service_template_name']
+                          'service_template_fk']
 
     # region foreign_keys
 
@@ -472,6 +465,11 @@ class NodeTemplateBase(TemplateModelMixin):
         """Required for use by SQLAlchemy queries"""
         return association_proxy('service_template', 'name')
 
+    @declared_attr
+    def type_name(cls):
+        """Required for use by SQLAlchemy queries"""
+        return association_proxy('type', 'name')
+
     # endregion
 
     # region one_to_one relationships
@@ -558,6 +556,7 @@ class NodeTemplateBase(TemplateModelMixin):
                            type=self.type,
                            description=deepcopy_with_locators(self.description),
                            state=models.Node.INITIAL,
+                           runtime_properties={},
                            node_template=self)
         utils.instantiate_dict(node, node.properties, self.properties)
         utils.instantiate_dict(node, node.interfaces, self.interface_templates)
@@ -1238,7 +1237,8 @@ class RequirementTemplateBase(TemplateModelMixin):
 
         # Find first node that matches the type
         elif self.target_node_type is not None:
-            for target_node_template in context.modeling.template.node_templates.itervalues():
+            for target_node_template in \
+                    self.node_template.service_template.node_templates.values():
                 if self.target_node_type.get_descendant(target_node_template.type.name) is None:
                     continue
 
@@ -1865,16 +1865,22 @@ class OperationTemplateBase(TemplateModelMixin):
 
     def instantiate(self, container):
         from . import models
-        if self.plugin_specification and self.plugin_specification.enabled:
-            plugin = self.plugin_specification.plugin
-            implementation = self.implementation if plugin is not None else None
-            # "plugin" would be none if a match was not found. In that case, a validation error
-            # should already have been reported in ServiceTemplateBase.instantiate, so we will
-            # continue silently here
+        if self.plugin_specification:
+            if self.plugin_specification.enabled:
+                plugin = self.plugin_specification.plugin
+                implementation = self.implementation if plugin is not None else None
+                # "plugin" would be none if a match was not found. In that case, a validation error
+                # should already have been reported in ServiceTemplateBase.instantiate, so we will
+                # continue silently here
+            else:
+                # If the plugin is disabled, the operation should be disabled, too
+                plugin = None
+                implementation = None
         else:
-            # If the plugin is disabled, the operation should be disabled, too
+            # using the execution plugin
             plugin = None
-            implementation = None
+            implementation = self.implementation
+
         operation = models.Operation(name=self.name,
                                      description=deepcopy_with_locators(self.description),
                                      relationship_edge=self.relationship_edge,
@@ -2120,25 +2126,16 @@ class PluginSpecificationBase(TemplateModelMixin):
     def coerce_values(self, container, report_issues):
         pass
 
-    def resolve(self):
+    def resolve(self, model_storage):
         # TODO: we are planning a separate "instantiation" module where this will be called or
-        # moved to. There, we will probably have a context with a storage manager. Until then,
-        # this is the only potentially available context, which of course will only be available
-        # if we're in a workflow.
-        from ..orchestrator import context
-        try:
-            workflow_context = context.workflow.current.get()
-            plugins = workflow_context.model.plugin.list()
-        except context.exceptions.ContextException:
-            plugins = None
-
+        # moved to.
+        plugins = model_storage.plugin.list()
         matching_plugins = []
-        if plugins:
-            for plugin in plugins:
-                # TODO: we need to use a version comparator
-                if (plugin.name == self.name) and \
+        for plugin in plugins:
+            # TODO: we need to use a version comparator
+            if (plugin.name == self.name) and \
                     ((self.version is None) or (plugin.package_version >= self.version)):
-                    matching_plugins.append(plugin)
+                matching_plugins.append(plugin)
         self.plugin = None
         if matching_plugins:
             # Return highest version of plugin

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/modeling/utils.py
----------------------------------------------------------------------
diff --git a/aria/modeling/utils.py b/aria/modeling/utils.py
index 0b4015c..91d7b9c 100644
--- a/aria/modeling/utils.py
+++ b/aria/modeling/utils.py
@@ -13,12 +13,100 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
+from json import JSONEncoder
+from StringIO import StringIO
+
+from . import exceptions
 from ..parser.consumption import ConsumptionContext
 from ..parser.exceptions import InvalidValueError
 from ..parser.presentation import Value
 from ..utils.collections import OrderedDict
 from ..utils.console import puts
-from .exceptions import CannotEvaluateFunctionException
+from ..utils.type import validate_value_type
+
+
+class ModelJSONEncoder(JSONEncoder):
+    def default(self, o):  # pylint: disable=method-hidden
+        from .mixins import ModelMixin
+        if isinstance(o, ModelMixin):
+            if hasattr(o, 'value'):
+                dict_to_return = o.to_dict(fields=('value',))
+                return dict_to_return['value']
+            else:
+                return o.to_dict()
+        else:
+            return JSONEncoder.default(self, o)
+
+
+def create_inputs(inputs, template_inputs):
+    """
+    :param inputs: key-value dict
+    :param template_inputs: parameter name to parameter object dict
+    :return: dict of parameter name to Parameter models
+    """
+    merged_inputs = _merge_and_validate_inputs(inputs, template_inputs)
+
+    from . import models
+    input_models = []
+    for input_name, input_val in merged_inputs.iteritems():
+        parameter = models.Parameter(
+            name=input_name,
+            type_name=template_inputs[input_name].type_name,
+            description=template_inputs[input_name].description,
+            value=input_val)
+        input_models.append(parameter)
+
+    return dict((inp.name, inp) for inp in input_models)
+
+
+def _merge_and_validate_inputs(inputs, template_inputs):
+    """
+    :param inputs: key-value dict
+    :param template_inputs: parameter name to parameter object dict
+    :return:
+    """
+    merged_inputs = inputs.copy()
+
+    missing_inputs = []
+    wrong_type_inputs = {}
+    for input_name, input_template in template_inputs.iteritems():
+        if input_name not in inputs:
+            if input_template.value is not None:
+                merged_inputs[input_name] = input_template.value  # apply default value
+            else:
+                missing_inputs.append(input_name)
+        else:
+            # Validate input type
+            try:
+                validate_value_type(inputs[input_name], input_template.type_name)
+            except ValueError:
+                wrong_type_inputs[input_name] = input_template.type_name
+            except RuntimeError:
+                # TODO: This error shouldn't be raised (or caught), but right now we lack support
+                # for custom data_types, which will raise this error. Skipping their validation.
+                pass
+
+    if missing_inputs:
+        raise exceptions.MissingRequiredInputsException(
+            'Required inputs {0} have not been specified - expected inputs: {1}'
+            .format(missing_inputs, template_inputs.keys()))
+
+    if wrong_type_inputs:
+        error_message = StringIO()
+        for param_name, param_type in wrong_type_inputs.iteritems():
+            error_message.write('Input "{0}" must be of type {1}{2}'
+                                .format(param_name, param_type, os.linesep))
+        raise exceptions.InputsOfWrongTypeException(error_message.getvalue())
+
+    undeclared_inputs = [input_name for input_name in inputs.keys()
+                         if input_name not in template_inputs]
+    if undeclared_inputs:
+        raise exceptions.UndeclaredInputsException(
+            'Undeclared inputs have been specified: {0}; Expected inputs: {1}'
+            .format(undeclared_inputs, template_inputs.keys()))
+
+    return merged_inputs
 
 
 def coerce_value(container, value, report_issues=False):
@@ -35,7 +123,7 @@ def coerce_value(container, value, report_issues=False):
         try:
             value = value._evaluate(context, container)
             value = coerce_value(container, value, report_issues)
-        except CannotEvaluateFunctionException:
+        except exceptions.CannotEvaluateFunctionException:
             pass
         except InvalidValueError as e:
             if report_issues:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/context/common.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py
index 127641f..15843db 100644
--- a/aria/orchestrator/context/common.py
+++ b/aria/orchestrator/context/common.py
@@ -19,7 +19,6 @@ A common context for both workflow and operation
 
 import logging
 from contextlib import contextmanager
-from datetime import datetime
 from functools import partial
 
 import jinja2
@@ -55,6 +54,7 @@ class BaseContext(object):
             self,
             name,
             service_id,
+            execution_id,
             model_storage,
             resource_storage,
             workdir=None,
@@ -65,27 +65,17 @@ class BaseContext(object):
         self._model = model_storage
         self._resource = resource_storage
         self._service_id = service_id
+        self._execution_id = execution_id
         self._workdir = workdir
         self.logger = None
 
-    def _create_execution(self):
-        now = datetime.utcnow()
-        execution = self.model.execution.model_cls(
-            service_instance=self.service_instance,
-            workflow_name=self._workflow_name,
-            created_at=now,
-            parameters=self.parameters,
-        )
-        self.model.execution.put(execution)
-        return execution.id
-
-    def _register_logger(self, logger_name=None, level=None, task_id=None):
-        self.logger = self.PrefixedLogger(logging.getLogger(logger_name or self.__class__.__name__),
-                                          self.logging_id,
-                                          task_id=task_id)
-        self.logger.addHandler(aria_logger.create_console_log_handler())
-        self.logger.addHandler(self._get_sqla_handler())
+    def _register_logger(self, level=None, task_id=None):
+        self.logger = self.PrefixedLogger(
+            logging.getLogger(aria_logger.TASK_LOGGER_NAME), self.logging_id, task_id=task_id)
         self.logger.setLevel(level or logging.DEBUG)
+        if not self.logger.handlers:
+            self.logger.addHandler(aria_logger.create_console_log_handler())
+            self.logger.addHandler(self._get_sqla_handler())
 
     def _get_sqla_handler(self):
         api_kwargs = {}
@@ -168,13 +158,13 @@ class BaseContext(object):
         Download a blueprint resource from the resource storage
         """
         try:
-            self.resource.deployment.download(entry_id=str(self.service.id),
-                                              destination=destination,
-                                              path=path)
+            self.resource.service.download(entry_id=str(self.service.id),
+                                           destination=destination,
+                                           path=path)
         except exceptions.StorageError:
-            self.resource.blueprint.download(entry_id=str(self.service_template.id),
-                                             destination=destination,
-                                             path=path)
+            self.resource.service_template.download(entry_id=str(self.service_template.id),
+                                                    destination=destination,
+                                                    path=path)
 
     def download_resource_and_render(self, destination, path=None, variables=None):
         """
@@ -193,9 +183,10 @@ class BaseContext(object):
         Read a deployment resource as string from the resource storage
         """
         try:
-            return self.resource.deployment.read(entry_id=str(self.service.id), path=path)
+            return self.resource.service.read(entry_id=str(self.service.id), path=path)
         except exceptions.StorageError:
-            return self.resource.deployment.read(entry_id=str(self.service_template.id), path=path)
+            return self.resource.service_template.read(entry_id=str(self.service_template.id),
+                                                       path=path)
 
     def get_resource_and_render(self, path=None, variables=None):
         """

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/context/operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py
index cbd186c..c7d8246 100644
--- a/aria/orchestrator/context/operation.py
+++ b/aria/orchestrator/context/operation.py
@@ -36,7 +36,6 @@ class BaseOperationContext(BaseContext):
                  service_id,
                  task_id,
                  actor_id,
-                 execution_id,
                  **kwargs):
         super(BaseOperationContext, self).__init__(
             name=name,
@@ -47,7 +46,6 @@ class BaseOperationContext(BaseContext):
         self._task_id = task_id
         self._actor_id = actor_id
         self._thread_local = threading.local()
-        self._execution_id = execution_id
         self._register_logger(task_id=self.task.id)
 
     def __repr__(self):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/context/workflow.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py
index 5f86d9d..667d22f 100644
--- a/aria/orchestrator/context/workflow.py
+++ b/aria/orchestrator/context/workflow.py
@@ -19,7 +19,6 @@ Workflow and operation contexts
 
 import threading
 from contextlib import contextmanager
-from datetime import datetime
 
 from .exceptions import ContextException
 from .common import BaseContext
@@ -35,36 +34,21 @@ class WorkflowContext(BaseContext):
                  task_max_attempts=1,
                  task_retry_interval=0,
                  task_ignore_failure=False,
-                 execution_id=None,
                  *args, **kwargs):
         super(WorkflowContext, self).__init__(*args, **kwargs)
         self._workflow_name = workflow_name
-        self.parameters = parameters or {}
+        self._parameters = parameters or {}
         self._task_max_attempts = task_max_attempts
         self._task_retry_interval = task_retry_interval
         self._task_ignore_failure = task_ignore_failure
-        # TODO: execution creation should happen somewhere else
-        # should be moved there, when such logical place exists
-        self._execution_id = execution_id or self._create_execution()
         self._register_logger()
 
     def __repr__(self):
         return (
             '{name}(deployment_id={self._service_id}, '
-            'workflow_name={self._workflow_name}'.format(
+            'workflow_name={self._workflow_name}, execution_id={self._execution_id})'.format(
                 name=self.__class__.__name__, self=self))
 
-    def _create_execution(self):
-        now = datetime.utcnow()
-        execution = self.model.execution.model_cls(
-            service=self.service,
-            workflow_name=self._workflow_name,
-            created_at=now,
-            parameters=self.parameters,
-        )
-        self.model.execution.put(execution)
-        return execution.id
-
     @property
     def logging_id(self):
         return '{0}[{1}]'.format(self._workflow_name, self._execution_id)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/exceptions.py b/aria/orchestrator/exceptions.py
index c00b66b..8d3dcc6 100644
--- a/aria/orchestrator/exceptions.py
+++ b/aria/orchestrator/exceptions.py
@@ -25,6 +25,13 @@ class OrchestratorError(AriaError):
     pass
 
 
+class InvalidPluginError(AriaError):
+    """
+    Raised when an invalid plugin is validated unsuccessfully
+    """
+    pass
+
+
 class PluginAlreadyExistsError(AriaError):
     """
     Raised when a plugin with the same package name and package version already exists
@@ -46,3 +53,24 @@ class TaskAbortException(RuntimeError):
     Used internally when ctx.task.abort is called
     """
     pass
+
+
+class UndeclaredWorkflowError(AriaError):
+    """
+    Raised when attempting to execute an undeclared workflow
+    """
+    pass
+
+
+class ActiveExecutionsError(AriaError):
+    """
+    Raised when attempting to execute a workflow on a service which already has an active execution
+    """
+    pass
+
+
+class WorkflowImplementationNotFoundError(AriaError):
+    """
+    Raised when attempting to import a workflow's code but the implementation is not found
+    """
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/execution_plugin/ctx_proxy/server.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/ctx_proxy/server.py b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
index 817d064..52a5312 100644
--- a/aria/orchestrator/execution_plugin/ctx_proxy/server.py
+++ b/aria/orchestrator/execution_plugin/ctx_proxy/server.py
@@ -24,6 +24,7 @@ import StringIO
 import wsgiref.simple_server
 
 import bottle
+from aria import modeling
 
 from .. import exceptions
 
@@ -111,7 +112,7 @@ class CtxProxy(object):
             result = json.dumps({
                 'type': result_type,
                 'payload': payload
-            })
+            }, cls=modeling.utils.ModelJSONEncoder)
         except Exception as e:
             traceback_out = StringIO.StringIO()
             traceback.print_exc(file=traceback_out)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/execution_plugin/instantiation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/execution_plugin/instantiation.py b/aria/orchestrator/execution_plugin/instantiation.py
index 960835c..7627a38 100644
--- a/aria/orchestrator/execution_plugin/instantiation.py
+++ b/aria/orchestrator/execution_plugin/instantiation.py
@@ -27,7 +27,7 @@ def configure_operation(operation):
     arguments = OrderedDict()
     arguments['script_path'] = operation.implementation
     arguments['process'] = _get_process(configuration.pop('process')) \
-        if 'process' in configuration else None
+        if 'process' in configuration else dict()
 
     host = None
     interface = operation.interface

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/plugin.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/plugin.py b/aria/orchestrator/plugin.py
index d815754..f99666c 100644
--- a/aria/orchestrator/plugin.py
+++ b/aria/orchestrator/plugin.py
@@ -17,6 +17,7 @@ import os
 import tempfile
 import subprocess
 import sys
+import zipfile
 from datetime import datetime
 
 import wagon
@@ -43,11 +44,11 @@ class PluginManager(object):
         os_props = metadata['build_server_os_properties']
 
         plugin = cls(
+            name=metadata['package_name'],
             archive_name=metadata['archive_name'],
             supported_platform=metadata['supported_platform'],
             supported_py_versions=metadata['supported_python_versions'],
-            # Remove suffix colon after upgrading wagon to > 0.5.0
-            distribution=os_props.get('distribution:') or os_props.get('distribution'),
+            distribution=os_props.get('distribution'),
             distribution_release=os_props['distribution_version'],
             distribution_version=os_props['distribution_release'],
             package_name=metadata['package_name'],
@@ -70,6 +71,28 @@ class PluginManager(object):
             self._plugins_dir,
             '{0}-{1}'.format(plugin.package_name, plugin.package_version))
 
+    @staticmethod
+    def validate_plugin(source):
+        """
+        validate a plugin archive.
+        A valid plugin is a wagon (http://github.com/cloudify-cosmo/wagon)
+        in the zip format (suffix may also be .wgn).
+        """
+        if not zipfile.is_zipfile(source):
+            raise exceptions.InvalidPluginError(
+                'Archive {0} is of an unsupported type. Only '
+                'zip/wgn is allowed'.format(source))
+        with zipfile.ZipFile(source, 'r') as zip_file:
+            infos = zip_file.infolist()
+            try:
+                package_name = infos[0].filename[:infos[0].filename.index('/')]
+                package_json_path = "{0}/{1}".format(package_name, 'package.json')
+                zip_file.getinfo(package_json_path)
+            except (KeyError, ValueError, IndexError):
+                raise exceptions.InvalidPluginError(
+                    'Failed to validate plugin {0} '
+                    '(package.json was not found in archive)'.format(source))
+
     def _install_wagon(self, source, prefix):
         pip_freeze_output = self._pip_freeze()
         file_descriptor, constraint_path = tempfile.mkstemp(prefix='constraint-', suffix='.txt')

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py
deleted file mode 100644
index f1633fa..0000000
--- a/aria/orchestrator/runner.py
+++ /dev/null
@@ -1,101 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Workflow runner
-"""
-
-import tempfile
-import os
-
-from .context.workflow import WorkflowContext
-from .workflows.core.engine import Engine
-from .workflows.executor.thread import ThreadExecutor
-from ..storage import (
-    sql_mapi,
-    filesystem_rapi,
-)
-from .. import (
-    application_model_storage,
-    application_resource_storage
-)
-
-
-class Runner(object):
-    """
-    Runs workflows on a deployment. By default uses temporary storage (either on disk or in memory)
-    but can also be used with existing storage.
-
-    Handles the initialization of the storage engine and provides convenience methods for
-    sub-classes to create tasks.
-
-    :param path: path to Sqlite database file; use '' (the default) to use a temporary file,
-                 and None to use an in-memory database
-    :type path: string
-    """
-
-    def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn,
-                 service_id_fn, storage_path='', is_storage_temporary=True):
-        if storage_path == '':
-            # Temporary file storage
-            the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-')
-            os.close(the_file)
-
-        self._storage_path = storage_path
-        self._storage_dir = os.path.dirname(storage_path)
-        self._storage_name = os.path.basename(storage_path)
-        self._is_storage_temporary = is_storage_temporary
-
-        workflow_context = self.create_workflow_context(workflow_name, initialize_model_storage_fn,
-                                                        service_id_fn)
-
-        tasks_graph = workflow_fn(ctx=workflow_context, **inputs)
-
-        self._engine = Engine(
-            executor=ThreadExecutor(),
-            workflow_context=workflow_context,
-            tasks_graph=tasks_graph)
-
-    def run(self):
-        try:
-            self._engine.execute()
-        finally:
-            self.cleanup()
-
-    def create_workflow_context(self,
-                                workflow_name,
-                                initialize_model_storage_fn,
-                                service_id_fn):
-        self.cleanup()
-        model_storage = application_model_storage(
-            sql_mapi.SQLAlchemyModelAPI,
-            initiator_kwargs=dict(base_dir=self._storage_dir, filename=self._storage_name))
-        if initialize_model_storage_fn:
-            initialize_model_storage_fn(model_storage)
-        resource_storage = application_resource_storage(
-            filesystem_rapi.FileSystemResourceAPI, api_kwargs=dict(directory='.'))
-        return WorkflowContext(
-            name=workflow_name,
-            model_storage=model_storage,
-            resource_storage=resource_storage,
-            service_id=service_id_fn(),
-            workflow_name=self.__class__.__name__,
-            task_max_attempts=1,
-            task_retry_interval=1)
-
-    def cleanup(self):
-        if (self._is_storage_temporary and (self._storage_path is not None) and
-                os.path.isfile(self._storage_path)):
-            os.remove(self._storage_path)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflow_runner.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflow_runner.py b/aria/orchestrator/workflow_runner.py
new file mode 100644
index 0000000..1ea60a1
--- /dev/null
+++ b/aria/orchestrator/workflow_runner.py
@@ -0,0 +1,161 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Workflow runner
+"""
+
+import os
+import sys
+from datetime import datetime
+
+from . import exceptions
+from .context.workflow import WorkflowContext
+from .workflows import builtin
+from .workflows.core.engine import Engine
+from .workflows.executor.process import ProcessExecutor
+from ..modeling import models
+from ..modeling import utils as modeling_utils
+from ..utils.imports import import_fullname
+
+
+DEFAULT_TASK_MAX_ATTEMPTS = 30
+DEFAULT_TASK_RETRY_INTERVAL = 30
+
+
+class WorkflowRunner(object):
+
+    def __init__(self, workflow_name, service_id, inputs,
+                 model_storage, resource_storage, plugin_manager,
+                 executor=None, task_max_attempts=DEFAULT_TASK_MAX_ATTEMPTS,
+                 task_retry_interval=DEFAULT_TASK_RETRY_INTERVAL):
+        """
+        Manages a single workflow execution on a given service
+        :param workflow_name: Workflow name
+        :param service_id: Service id
+        :param inputs: A key-value dict of inputs for the execution
+        :param model_storage: Model storage
+        :param resource_storage: Resource storage
+        :param plugin_manager: Plugin manager
+        :param executor: Executor for tasks. Defaults to a ProcessExecutor instance.
+        :param task_max_attempts: Maximum attempts of repeating each failing task
+        :param task_retry_interval: Retry interval in between retry attempts of a failing task
+        """
+
+        self._model_storage = model_storage
+        self._resource_storage = resource_storage
+        self._workflow_name = workflow_name
+
+        # the IDs are stored rather than the models themselves, so this module could be used
+        # by several threads without raising errors on model objects shared between threads
+        self._service_id = service_id
+
+        self._validate_workflow_exists_for_service()
+
+        workflow_fn = self._get_workflow_fn()
+
+        execution = self._create_execution_model(inputs)
+        self._execution_id = execution.id
+
+        workflow_context = WorkflowContext(
+            name=self.__class__.__name__,
+            model_storage=self._model_storage,
+            resource_storage=resource_storage,
+            service_id=service_id,
+            execution_id=execution.id,
+            workflow_name=workflow_name,
+            task_max_attempts=task_max_attempts,
+            task_retry_interval=task_retry_interval)
+
+        # transforming the execution inputs to dict, to pass them to the workflow function
+        execution_inputs_dict = dict(inp.unwrap() for inp in self.execution.inputs.values())
+        self._tasks_graph = workflow_fn(ctx=workflow_context, **execution_inputs_dict)
+
+        executor = executor or ProcessExecutor(plugin_manager=plugin_manager)
+        self._engine = Engine(
+            executor=executor,
+            workflow_context=workflow_context,
+            tasks_graph=self._tasks_graph)
+
+    @property
+    def execution(self):
+        return self._model_storage.execution.get(self._execution_id)
+
+    @property
+    def service(self):
+        return self._model_storage.service.get(self._service_id)
+
+    def execute(self):
+        self._engine.execute()
+
+    def cancel(self):
+        self._engine.cancel_execution()
+
+    def _create_execution_model(self, inputs):
+        execution = models.Execution(
+            created_at=datetime.utcnow(),
+            service=self.service,
+            workflow_name=self._workflow_name,
+            inputs={})
+
+        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+            workflow_inputs = dict()  # built-in workflows don't have any inputs
+        else:
+            workflow_inputs = self.service.workflows[self._workflow_name].inputs
+
+        execution.inputs = modeling_utils.create_inputs(inputs, workflow_inputs)
+        # TODO: these two following calls should execute atomically
+        self._validate_no_active_executions(execution)
+        self._model_storage.execution.put(execution)
+        return execution
+
+    def _validate_workflow_exists_for_service(self):
+        if self._workflow_name not in self.service.workflows and \
+                        self._workflow_name not in builtin.BUILTIN_WORKFLOWS:
+            raise exceptions.UndeclaredWorkflowError(
+                'No workflow policy {0} declared in service {1}'
+                .format(self._workflow_name, self.service.name))
+
+    def _validate_no_active_executions(self, execution):
+        active_executions = [e for e in self.service.executions if e.is_active()]
+        if active_executions:
+            raise exceptions.ActiveExecutionsError(
+                "Can't start execution; Service {0} has an active execution with id {1}"
+                .format(self.service.name, active_executions[0].id))
+
+    def _get_workflow_fn(self):
+        if self._workflow_name in builtin.BUILTIN_WORKFLOWS:
+            return import_fullname('{0}.{1}'.format(builtin.BUILTIN_WORKFLOWS_PATH_PREFIX,
+                                                    self._workflow_name))
+
+        workflow = self.service.workflows[self._workflow_name]
+
+        # TODO: Custom workflow support needs improvement, currently this code uses internal
+        # knowledge of the resource storage; Instead, workflows should probably be loaded
+        # in a similar manner to operation plugins. Also consider passing to import_fullname
+        # as paths instead of appending to sys path.
+        service_template_resources_path = os.path.join(
+            self._resource_storage.service_template.base_path,
+            str(self.service.service_template.id))
+        sys.path.append(service_template_resources_path)
+
+        try:
+            workflow_fn = import_fullname(workflow.implementation)
+        except ImportError:
+            raise exceptions.WorkflowImplementationNotFoundError(
+                'Could not find workflow {0} implementation at {1}'.format(
+                    self._workflow_name, workflow.implementation))
+
+        return workflow_fn

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/api/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py
index 49c584c..82c40c3 100644
--- a/aria/orchestrator/workflows/api/task.py
+++ b/aria/orchestrator/workflows/api/task.py
@@ -16,18 +16,16 @@
 """
 Provides the tasks to be entered into the task graph
 """
-import copy
 
+from ... import context
 from ....modeling import models
-from ....utils.collections import (OrderedDict, FrozenDict)
+from ....modeling import utils as modeling_utils
 from ....utils.uuid import generate_uuid
-from ... import context
-from .. import exceptions
 
 
 class BaseTask(object):
     """
-    Abstract task_graph task
+    Abstract task graph task
     """
 
     def __init__(self, ctx=None, **kwargs):
@@ -56,14 +54,13 @@ class BaseTask(object):
 
 class OperationTask(BaseTask):
     """
-    Represents an operation task in the task graph.
+    Represents an operation task in the task graph
     """
 
     NAME_FORMAT = '{interface}:{operation}@{type}:{name}'
 
     def __init__(self,
                  actor,
-                 actor_type,
                  interface_name,
                  operation_name,
                  inputs=None,
@@ -75,122 +72,101 @@ class OperationTask(BaseTask):
         :meth:`for_relationship`.
         """
 
+        actor_type = type(actor).__name__.lower()
+        assert isinstance(actor, (models.Node, models.Relationship))
+        assert actor_type in ('node', 'relationship')
         assert interface_name and operation_name
         super(OperationTask, self).__init__()
 
-        operation = None
-        interface = actor.interfaces.get(interface_name)
-        if interface is not None:
-            operation = interface.operations.get(operation_name)
-
-        if operation is None:
-            raise exceptions.OperationNotFoundException(
-                'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
-                .format(operation_name, interface_name, actor_type, actor.name))
-
-        if operation.implementation is None:
-            raise exceptions.OperationNotFoundException(
-                'Empty operation "{0}" on interface "{1}" for {2} "{3}"'
-                .format(operation_name, interface_name, actor_type, actor.name))
-
         self.actor = actor
-        self.actor_type = actor_type
-        self.interface_name = interface_name
-        self.operation_name = operation_name
-
-        self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
-                                                     name=actor.name,
-                                                     interface=interface_name,
-                                                     operation=operation_name)
         self.max_attempts = (self.workflow_context._task_max_attempts
                              if max_attempts is None else max_attempts)
         self.retry_interval = (self.workflow_context._task_retry_interval
                                if retry_interval is None else retry_interval)
         self.ignore_failure = (self.workflow_context._task_ignore_failure
                                if ignore_failure is None else ignore_failure)
-        self.implementation = operation.implementation
-        self.plugin = operation.plugin
+        self.interface_name = interface_name
+        self.operation_name = operation_name
 
-        # Wrap inputs
-        inputs = copy.deepcopy(inputs) if inputs else {}
-        for k, v in inputs.iteritems():
-            if not isinstance(v, models.Parameter):
-                inputs[k] = models.Parameter.wrap(k, v)
+        operation = self.actor.interfaces[self.interface_name].operations[self.operation_name]
+        self.plugin = operation.plugin
+        self.inputs = modeling_utils.create_inputs(inputs or {}, operation.inputs)
+        self.implementation = operation.implementation
+        self.name = OperationTask.NAME_FORMAT.format(type=actor_type,
+                                                     name=actor.name,
+                                                     interface=self.interface_name,
+                                                     operation=self.operation_name)
 
-        self.inputs = OrderedDict(operation.inputs)
-        if inputs:
-            self.inputs.update(inputs)
-        self.inputs = FrozenDict(self.inputs)
+    def __repr__(self):
+        return self.name
 
     @classmethod
     def for_node(cls,
                  node,
                  interface_name,
                  operation_name,
-                 inputs=None,
                  max_attempts=None,
                  retry_interval=None,
-                 ignore_failure=None):
+                 ignore_failure=None,
+                 inputs=None):
         """
         Creates an operation on a node.
 
         :param node: The node on which to run the operation
         :param interface_name: The interface name
         :param operation_name: The operation name within the interface
-        :param inputs: Override the operation's inputs
         :param max_attempts: The maximum number of attempts in case the operation fails
-                             (if not specified the defaults is taken from the workflow context)
+                             (if not specified the defaults it taken from the workflow context)
         :param retry_interval: The interval in seconds between attempts when the operation fails
-                               (if not specified the defaults is taken from the workflow context)
+                               (if not specified the defaults it taken from the workflow context)
         :param ignore_failure: Whether to ignore failures
-                               (if not specified the defaults is taken from the workflow context)
+                               (if not specified the defaults it taken from the workflow context)
+        :param inputs: Additional operation inputs
         """
 
         assert isinstance(node, models.Node)
         return cls(
             actor=node,
-            actor_type='node',
             interface_name=interface_name,
             operation_name=operation_name,
-            inputs=inputs,
             max_attempts=max_attempts,
             retry_interval=retry_interval,
-            ignore_failure=ignore_failure)
+            ignore_failure=ignore_failure,
+            inputs=inputs)
 
     @classmethod
     def for_relationship(cls,
                          relationship,
                          interface_name,
                          operation_name,
-                         inputs=None,
                          max_attempts=None,
                          retry_interval=None,
-                         ignore_failure=None):
+                         ignore_failure=None,
+                         inputs=None):
         """
-        Creates an operation on a relationship.
+        Creates an operation on a relationship edge.
 
         :param relationship: The relationship on which to run the operation
         :param interface_name: The interface name
         :param operation_name: The operation name within the interface
-        :param inputs: Override the operation's inputs
         :param max_attempts: The maximum number of attempts in case the operation fails
-                             (if not specified the defaults is taken from the workflow context)
+                             (if not specified the defaults it taken from the workflow context)
         :param retry_interval: The interval in seconds between attempts when the operation fails
-                               (if not specified the defaults is taken from the workflow context)
+                               (if not specified the defaults it taken from the workflow context)
         :param ignore_failure: Whether to ignore failures
-                               (if not specified the defaults is taken from the workflow context)
+                               (if not specified the defaults it taken from the workflow context)
+        :param inputs: Additional operation inputs
         """
 
         assert isinstance(relationship, models.Relationship)
         return cls(
             actor=relationship,
-            actor_type='relationship',
             interface_name=interface_name,
             operation_name=operation_name,
-            inputs=inputs,
             max_attempts=max_attempts,
             retry_interval=retry_interval,
-            ignore_failure=ignore_failure)
+            ignore_failure=ignore_failure,
+            inputs=inputs)
 
 
 class WorkflowTask(BaseTask):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/builtin/__init__.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/__init__.py b/aria/orchestrator/workflows/builtin/__init__.py
index d43a962..8b13c62 100644
--- a/aria/orchestrator/workflows/builtin/__init__.py
+++ b/aria/orchestrator/workflows/builtin/__init__.py
@@ -24,6 +24,7 @@ from .stop import stop
 
 
 BUILTIN_WORKFLOWS = ('install', 'uninstall', 'start', 'stop')
+BUILTIN_WORKFLOWS_PATH_PREFIX = 'aria.orchestrator.workflows.builtin'
 
 
 __all__ = [

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/builtin/execute_operation.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/execute_operation.py b/aria/orchestrator/workflows/builtin/execute_operation.py
index 348f47a..16504ec 100644
--- a/aria/orchestrator/workflows/builtin/execute_operation.py
+++ b/aria/orchestrator/workflows/builtin/execute_operation.py
@@ -17,7 +17,7 @@
 Builtin execute_operation workflow
 """
 
-from ..api.task import OperationTask
+from . import utils
 from ... import workflow
 
 
@@ -28,7 +28,6 @@ def execute_operation(
         interface_name,
         operation_name,
         operation_kwargs,
-        allow_kwargs_override,
         run_by_dependency_order,
         type_names,
         node_template_ids,
@@ -41,7 +40,6 @@ def execute_operation(
     :param TaskGraph graph: the graph which will describe the workflow.
     :param basestring operation: the operation name to execute
     :param dict operation_kwargs:
-    :param bool allow_kwargs_override:
     :param bool run_by_dependency_order:
     :param type_names:
     :param node_template_ids:
@@ -71,8 +69,7 @@ def execute_operation(
                 node=node,
                 interface_name=interface_name,
                 operation_name=operation_name,
-                operation_kwargs=operation_kwargs,
-                allow_kwargs_override=allow_kwargs_override
+                operation_kwargs=operation_kwargs
             )
         )
 
@@ -108,21 +105,16 @@ def _create_node_task(
         node,
         interface_name,
         operation_name,
-        operation_kwargs,
-        allow_kwargs_override):
+        operation_kwargs):
     """
     A workflow which executes a single operation
     :param node: the node instance to install
     :param basestring operation: the operation name
     :param dict operation_kwargs:
-    :param bool allow_kwargs_override:
     :return:
     """
 
-    if allow_kwargs_override is not None:
-        operation_kwargs['allow_kwargs_override'] = allow_kwargs_override
-
-    return OperationTask.for_node(
+    return utils.create_node_task(
         node=node,
         interface_name=interface_name,
         operation_name=operation_name,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/builtin/utils.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/builtin/utils.py b/aria/orchestrator/workflows/builtin/utils.py
index 752fe35..2254d13 100644
--- a/aria/orchestrator/workflows/builtin/utils.py
+++ b/aria/orchestrator/workflows/builtin/utils.py
@@ -12,26 +12,31 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from ..api.task import OperationTask
+
+from ..api.task import OperationTask, StubTask
 from .. import exceptions
 
 
-def create_node_task(node, interface_name, operation_name):
+def create_node_task(node, interface_name, operation_name, **kwargs):
     """
     Returns a new operation task if the operation exists in the node, otherwise returns None.
     """
 
     try:
+        if _is_empty_task(node, interface_name, operation_name):
+            return StubTask()
+
         return OperationTask.for_node(node=node,
                                       interface_name=interface_name,
-                                      operation_name=operation_name)
+                                      operation_name=operation_name,
+                                      **kwargs)
     except exceptions.OperationNotFoundException:
         # We will skip nodes which do not have the operation
         return None
 
 
 def create_relationships_tasks(
-        node, interface_name, source_operation_name=None, target_operation_name=None):
+        node, interface_name, source_operation_name=None, target_operation_name=None, **kwargs):
     """
     Creates a relationship task (source and target) for all of a node_instance relationships.
     :param basestring source_operation_name: the relationship operation name.
@@ -43,21 +48,18 @@ def create_relationships_tasks(
     """
     sub_tasks = []
     for relationship in node.outbound_relationships:
-        try:
-            relationship_operations = relationship_tasks(
-                relationship,
-                interface_name,
-                source_operation_name=source_operation_name,
-                target_operation_name=target_operation_name)
-            sub_tasks.append(relationship_operations)
-        except exceptions.OperationNotFoundException:
-            # We will skip relationships which do not have the operation
-            pass
+        relationship_operations = relationship_tasks(
+            relationship,
+            interface_name,
+            source_operation_name=source_operation_name,
+            target_operation_name=target_operation_name,
+            **kwargs)
+        sub_tasks.append(relationship_operations)
     return sub_tasks
 
 
-def relationship_tasks(
-        relationship, interface_name, source_operation_name=None, target_operation_name=None):
+def relationship_tasks(relationship, interface_name, source_operation_name=None,
+                       target_operation_name=None, **kwargs):
     """
     Creates a relationship task source and target.
     :param Relationship relationship: the relationship instance itself
@@ -68,17 +70,33 @@ def relationship_tasks(
     """
     operations = []
     if source_operation_name:
-        operations.append(
-            OperationTask.for_relationship(relationship=relationship,
-                                           interface_name=interface_name,
-                                           operation_name=source_operation_name)
-        )
+        try:
+            if _is_empty_task(relationship, interface_name, source_operation_name):
+                operations.append(StubTask())
+            else:
+                operations.append(
+                    OperationTask.for_relationship(relationship=relationship,
+                                                   interface_name=interface_name,
+                                                   operation_name=source_operation_name,
+                                                   **kwargs)
+                )
+        except exceptions.OperationNotFoundException:
+            # We will skip relationships which do not have the operation
+            pass
     if target_operation_name:
-        operations.append(
-            OperationTask.for_relationship(relationship=relationship,
-                                           interface_name=interface_name,
-                                           operation_name=target_operation_name)
-        )
+        try:
+            if _is_empty_task(relationship, interface_name, target_operation_name):
+                operations.append(StubTask())
+            else:
+                operations.append(
+                    OperationTask.for_relationship(relationship=relationship,
+                                                   interface_name=interface_name,
+                                                   operation_name=target_operation_name,
+                                                   **kwargs)
+                )
+        except exceptions.OperationNotFoundException:
+            # We will skip relationships which do not have the operation
+            pass
 
     return operations
 
@@ -106,3 +124,15 @@ def create_node_task_dependencies(graph, tasks_and_nodes, reverse=False):
                     graph.add_dependency(dependency, task)
             else:
                 graph.add_dependency(task, dependencies)
+
+
+def _is_empty_task(actor, interface_name, operation_name):
+    interface = actor.interfaces.get(interface_name)
+    if interface:
+        operation = interface.operations.get(operation_name)
+        if operation:
+            return operation.implementation is None
+
+    raise exceptions.OperationNotFoundException(
+        'Could not find operation "{0}" on interface "{1}" for {2} "{3}"'
+        .format(operation_name, interface_name, type(actor).__name__.lower(), actor.name))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/core/engine.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py
index f73cade..155d0ee 100644
--- a/aria/orchestrator/workflows/core/engine.py
+++ b/aria/orchestrator/workflows/core/engine.py
@@ -88,12 +88,12 @@ class Engine(logger.LoggerMixin):
     def _executable_tasks(self):
         now = datetime.utcnow()
         return (task for task in self._tasks_iter()
-                if task.is_waiting and
+                if task.is_waiting() and
                 task.due_at <= now and
                 not self._task_has_dependencies(task))
 
     def _ended_tasks(self):
-        return (task for task in self._tasks_iter() if task.has_ended)
+        return (task for task in self._tasks_iter() if task.has_ended())
 
     def _task_has_dependencies(self, task):
         return len(self._execution_graph.pred.get(task.id, {})) > 0
@@ -105,7 +105,7 @@ class Engine(logger.LoggerMixin):
         for _, data in self._execution_graph.nodes_iter(data=True):
             task = data['task']
             if isinstance(task, engine_task.OperationTask):
-                if not task.model_task.has_ended:
+                if not task.model_task.has_ended():
                     self._workflow_context.model.task.refresh(task.model_task)
             yield task
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/core/task.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py
index ba93e21..2b26152 100644
--- a/aria/orchestrator/workflows/core/task.py
+++ b/aria/orchestrator/workflows/core/task.py
@@ -69,11 +69,9 @@ class StubTask(BaseTask):
         self.status = models.Task.PENDING
         self.due_at = datetime.utcnow()
 
-    @property
     def has_ended(self):
         return self.status in (models.Task.SUCCESS, models.Task.FAILED)
 
-    @property
     def is_waiting(self):
         return self.status in (models.Task.PENDING, models.Task.RETRYING)
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/exceptions.py b/aria/orchestrator/workflows/exceptions.py
index 0ca263f..b5ae496 100644
--- a/aria/orchestrator/workflows/exceptions.py
+++ b/aria/orchestrator/workflows/exceptions.py
@@ -16,6 +16,8 @@
 """
 Workflow related Exception classes
 """
+import os
+
 from .. import exceptions
 
 
@@ -52,10 +54,10 @@ class ProcessException(ExecutorException):
         Describes the error in detail
         """
         return (
-            'Command "{error.command}" executed with an error.\n'
-            'code: {error.return_code}\n'
-            'error: {error.stderr}\n'
-            'output: {error.stdout}'.format(error=self))
+            'Command "{error.command}" executed with an error.{0}'
+            'code: {error.return_code}{0}'
+            'error: {error.stderr}{0}'
+            'output: {error.stdout}'.format(os.linesep, error=self))
 
 
 class AriaEngineError(exceptions.AriaError):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/celery.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/celery.py b/aria/orchestrator/workflows/executor/celery.py
index baa0375..7bd9b7c 100644
--- a/aria/orchestrator/workflows/executor/celery.py
+++ b/aria/orchestrator/workflows/executor/celery.py
@@ -44,7 +44,7 @@ class CeleryExecutor(BaseExecutor):
 
     def execute(self, task):
         self._tasks[task.id] = task
-        inputs = dict((k, v.value) for k, v in task.inputs.iteritems())
+        inputs = dict(inp.unwrap() for inp in task.inputs.values())
         inputs['ctx'] = task.context
         self._results[task.id] = self._app.send_task(
             task.operation_mapping,

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/dry.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/dry.py b/aria/orchestrator/workflows/executor/dry.py
new file mode 100644
index 0000000..d894b25
--- /dev/null
+++ b/aria/orchestrator/workflows/executor/dry.py
@@ -0,0 +1,51 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Dry executor
+"""
+
+from datetime import datetime
+
+from .base import BaseExecutor
+
+
+class DryExecutor(BaseExecutor):
+    """
+    Executor which dry runs tasks - prints task information without causing any side effects
+    """
+
+    def execute(self, task):
+        # updating the task manually instead of calling self._task_started(task),
+        # to avoid any side effects raising that event might cause
+        with task._update():
+            task.started_at = datetime.utcnow()
+            task.status = task.STARTED
+
+        actor_type = type(task.actor).__name__.lower()
+        implementation = '{0} > '.format(task.plugin) if task.plugin else ''
+        implementation += task.implementation
+        inputs = dict(inp.unwrap() for inp in task.inputs.values())
+
+        task.context.logger.info(
+            'Executing {actor_type} {task.actor.name} operation {task.interface_name} '
+            '{task.operation_name}: {implementation} (Inputs: {inputs})'
+            .format(actor_type=actor_type, task=task, implementation=implementation, inputs=inputs))
+
+        # updating the task manually instead of calling self._task_succeeded(task),
+        # to avoid any side effects raising that event might cause
+        with task._update():
+            task.ended_at = datetime.utcnow()
+            task.status = task.SUCCESS

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/process.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py
index f814c4d..851d78e 100644
--- a/aria/orchestrator/workflows/executor/process.py
+++ b/aria/orchestrator/workflows/executor/process.py
@@ -148,7 +148,7 @@ class ProcessExecutor(base.BaseExecutor):
         return {
             'task_id': task.id,
             'implementation': task.implementation,
-            'operation_inputs': dict((k, v.value) for k, v in task.inputs.iteritems()),
+            'operation_inputs': dict(inp.unwrap() for inp in task.inputs.values()),
             'port': self._server_port,
             'context': task.context.serialization_dict,
         }

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/orchestrator/workflows/executor/thread.py
----------------------------------------------------------------------
diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py
index 1a49af5..f422592 100644
--- a/aria/orchestrator/workflows/executor/thread.py
+++ b/aria/orchestrator/workflows/executor/thread.py
@@ -21,6 +21,7 @@ import Queue
 import threading
 
 from aria.utils import imports
+
 from .base import BaseExecutor
 
 
@@ -58,7 +59,7 @@ class ThreadExecutor(BaseExecutor):
                 self._task_started(task)
                 try:
                     task_func = imports.load_attribute(task.implementation)
-                    inputs = dict((k, v.value) for k, v in task.inputs.iteritems())
+                    inputs = dict(inp.unwrap() for inp in task.inputs.values())
                     task_func(ctx=task.context, **inputs)
                     self._task_succeeded(task)
                 except BaseException as e:

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/parser/consumption/__init__.py
----------------------------------------------------------------------
diff --git a/aria/parser/consumption/__init__.py b/aria/parser/consumption/__init__.py
index 7da8490..8f6d2b6 100644
--- a/aria/parser/consumption/__init__.py
+++ b/aria/parser/consumption/__init__.py
@@ -17,10 +17,21 @@
 from .exceptions import ConsumerException
 from .context import ConsumptionContext
 from .style import Style
-from .consumer import Consumer, ConsumerChain
+from .consumer import (
+    Consumer,
+    ConsumerChain
+)
 from .presentation import Read
 from .validation import Validate
-from .modeling import ServiceTemplate, Types, ServiceInstance
+from .modeling import (
+    ServiceTemplate,
+    Types,
+    ServiceInstance,
+    FindHosts,
+    ConfigureOperations,
+    SatisfyRequirements,
+    ValidateCapabilities
+)
 from .inputs import Inputs
 
 __all__ = (
@@ -34,4 +45,7 @@ __all__ = (
     'ServiceTemplate',
     'Types',
     'ServiceInstance',
-    'Inputs')
+    'Inputs',
+    'SatisfyRequirements',
+    'ValidateCapabilities'
+)

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/parser/consumption/modeling.py
----------------------------------------------------------------------
diff --git a/aria/parser/consumption/modeling.py b/aria/parser/consumption/modeling.py
index 6c616b4..771fd7f 100644
--- a/aria/parser/consumption/modeling.py
+++ b/aria/parser/consumption/modeling.py
@@ -106,7 +106,8 @@ class InstantiateServiceInstance(Consumer):
                                            'template')
             return
 
-        self.context.modeling.template.instantiate(None)
+        self.context.modeling.template.instantiate(None, None,
+                                                   inputs=dict(self.context.modeling.inputs))
 
 
 class CoerceServiceInstanceValues(Consumer):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/core.py
----------------------------------------------------------------------
diff --git a/aria/storage/core.py b/aria/storage/core.py
index 8302fc9..8caca66 100644
--- a/aria/storage/core.py
+++ b/aria/storage/core.py
@@ -38,7 +38,7 @@ API:
     * StorageDriver - class, abstract model implementation.
 """
 
-from aria.logger import LoggerMixin
+from aria.logger import LoggerMixin, NullHandler
 from . import sql_mapi
 
 __all__ = (
@@ -71,6 +71,10 @@ class Storage(LoggerMixin):
         :param kwargs:
         """
         super(Storage, self).__init__(**kwargs)
+        # Set the logger handler of any storage object to NullHandler.
+        # This is since the absence of a handler shows up while using the CLI in the form of:
+        # `No handlers could be found for logger "aria.ResourceStorage"`.
+        self.logger.addHandler(NullHandler())
         self.api = api_cls
         self.registered = {}
         self._initiator = initiator

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/storage/exceptions.py b/aria/storage/exceptions.py
index f982f63..3f0ecec 100644
--- a/aria/storage/exceptions.py
+++ b/aria/storage/exceptions.py
@@ -23,3 +23,7 @@ class StorageError(exceptions.AriaError):
     General storage exception
     """
     pass
+
+
+class NotFoundError(StorageError):
+    pass

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/instrumentation.py
----------------------------------------------------------------------
diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py
index 138432a..cf2a365 100644
--- a/aria/storage/instrumentation.py
+++ b/aria/storage/instrumentation.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import os
 import copy
 import json
 
@@ -189,9 +190,9 @@ def apply_tracked_changes(tracked_changes, model):
             if not value:
                 del successfully_updated_changes[key]
         model.logger.error(
-            'Registering all the changes to the storage has failed. \n'
-            'The successful updates were: \n '
-            '{0}'.format(json.dumps(successfully_updated_changes, indent=4)))
+            'Registering all the changes to the storage has failed. {0}'
+            'The successful updates were: {0} '
+            '{1}'.format(os.linesep, json.dumps(successfully_updated_changes, indent=4)))
 
         raise
 

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/storage/sql_mapi.py
----------------------------------------------------------------------
diff --git a/aria/storage/sql_mapi.py b/aria/storage/sql_mapi.py
index 8d34bb4..730d007 100644
--- a/aria/storage/sql_mapi.py
+++ b/aria/storage/sql_mapi.py
@@ -59,7 +59,7 @@ class SQLAlchemyModelAPI(api.ModelAPI):
         result = query.first()
 
         if not result:
-            raise exceptions.StorageError(
+            raise exceptions.NotFoundError(
                 'Requested `{0}` with ID `{1}` was not found'
                 .format(self.model_cls.__name__, entry_id)
             )
@@ -69,13 +69,13 @@ class SQLAlchemyModelAPI(api.ModelAPI):
         assert hasattr(self.model_cls, 'name')
         result = self.list(include=include, filters={'name': entry_name})
         if not result:
-            raise exceptions.StorageError(
-                'Requested {0} with NAME `{1}` was not found'
+            raise exceptions.NotFoundError(
+                'Requested {0} with name `{1}` was not found'
                 .format(self.model_cls.__name__, entry_name)
             )
         elif len(result) > 1:
             raise exceptions.StorageError(
-                'Requested {0} with NAME `{1}` returned more than 1 value'
+                'Requested {0} with name `{1}` returned more than 1 value'
                 .format(self.model_cls.__name__, entry_name)
             )
         else:
@@ -92,10 +92,8 @@ class SQLAlchemyModelAPI(api.ModelAPI):
         results, total, size, offset = self._paginate(query, pagination)
 
         return ListResult(
-            items=results,
-            metadata=dict(total=total,
-                          size=size,
-                          offset=offset)
+            dict(total=total, size=size, offset=offset),
+            results
         )
 
     def iter(self,
@@ -406,19 +404,11 @@ def init_storage(base_dir, filename='db.sqlite'):
     return dict(engine=engine, session=session)
 
 
-class ListResult(object):
+class ListResult(list):
     """
     a ListResult contains results about the requested items.
     """
-    def __init__(self, items, metadata):
-        self.items = items
+    def __init__(self, metadata, *args, **qwargs):
+        super(ListResult, self).__init__(*args, **qwargs)
         self.metadata = metadata
-
-    def __len__(self):
-        return len(self.items)
-
-    def __iter__(self):
-        return iter(self.items)
-
-    def __getitem__(self, item):
-        return self.items[item]
+        self.items = self

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/application.py
----------------------------------------------------------------------
diff --git a/aria/utils/application.py b/aria/utils/application.py
deleted file mode 100644
index 2f40825..0000000
--- a/aria/utils/application.py
+++ /dev/null
@@ -1,294 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Convenience storage related tools.
-# TODO rename module name
-"""
-
-import json
-import os
-import shutil
-import tarfile
-import tempfile
-from datetime import datetime
-
-from aria.storage.exceptions import StorageError
-from aria.logger import LoggerMixin
-
-
-class StorageManager(LoggerMixin):
-    """
-    Convenience wrapper to simplify work with the lower level storage mechanism
-    """
-
-    def __init__(
-            self,
-            model_storage,
-            resource_storage,
-            blueprint_path,
-            blueprint_id,
-            blueprint_plan,
-            deployment_id,
-            deployment_plan,
-            **kwargs):
-        super(StorageManager, self).__init__(**kwargs)
-        self.model_storage = model_storage
-        self.resource_storage = resource_storage
-        self.blueprint_path = blueprint_path
-        self.blueprint_id = blueprint_id
-        self.blueprint_plan = blueprint_plan
-        self.deployment_id = deployment_id
-        self.deployment_plan = deployment_plan
-
-    @classmethod
-    def from_deployment(
-            cls,
-            model_storage,
-            resource_storage,
-            deployment_id,
-            deployment_plan):
-        """
-        Create a StorageManager from a deployment
-        """
-        return cls(
-            model_storage=model_storage,
-            resource_storage=resource_storage,
-            deployment_id=deployment_id,
-            deployment_plan=deployment_plan,
-            blueprint_path=None,
-            blueprint_plan=None,
-            blueprint_id=None
-        )
-
-    @classmethod
-    def from_blueprint(
-            cls,
-            model_storage,
-            resource_storage,
-            blueprint_path,
-            blueprint_id,
-            blueprint_plan):
-        """
-        Create a StorageManager from a blueprint
-        """
-        return cls(
-            model_storage=model_storage,
-            resource_storage=resource_storage,
-            blueprint_path=blueprint_path,
-            blueprint_plan=blueprint_plan,
-            blueprint_id=blueprint_id,
-            deployment_id=None,
-            deployment_plan=None)
-
-    def create_blueprint_storage(self, source, main_file_name=None):
-        """
-        create blueprint model & resource
-        """
-        assert self.blueprint_path and self.blueprint_id
-        assert hasattr(self.resource_storage, 'blueprint')
-        assert hasattr(self.model_storage, 'blueprint')
-
-        self.logger.debug('creating blueprint resource storage entry')
-        self.resource_storage.service_template.upload(
-            entry_id=self.blueprint_id,
-            source=os.path.dirname(source))
-        self.logger.debug('created blueprint resource storage entry')
-
-        self.logger.debug('creating blueprint model storage entry')
-        now = datetime.utcnow()
-        blueprint = self.model_storage.service_template.model_cls(
-            plan=self.blueprint_plan,
-            id=self.blueprint_id,
-            description=self.blueprint_plan.get('description'),
-            created_at=now,
-            updated_at=now,
-            main_file_name=main_file_name,
-        )
-        self.model_storage.service_template.put(blueprint)
-        self.logger.debug('created blueprint model storage entry')
-
-    def create_nodes_storage(self):
-        """
-        create nodes model
-        """
-        assert self.blueprint_path and self.blueprint_id
-        assert hasattr(self.model_storage, 'node')
-        assert hasattr(self.model_storage, 'relationship')
-
-        for node in self.blueprint_plan['nodes']:
-            node_copy = node.copy()
-            for field in ('name',
-                          'deployment_plugins_to_install',
-                          'interfaces',
-                          'instances'):
-                node_copy.pop(field)
-            scalable = node_copy.pop('capabilities')['scalable']['properties']
-            for index, relationship in enumerate(node_copy['relationships']):
-                relationship = self.model_storage.relationship.model_cls(**relationship)
-                self.model_storage.relationship.put(relationship)
-                node_copy['relationships'][index] = relationship
-
-            node_copy = self.model_storage.node.model_cls(
-                blueprint_id=self.blueprint_id,
-                planned_number_of_instances=scalable['current_instances'],
-                deploy_number_of_instances=scalable['default_instances'],
-                min_number_of_instances=scalable['min_instances'],
-                max_number_of_instances=scalable['max_instances'],
-                number_of_instances=scalable['current_instances'],
-                **node_copy)
-            self.model_storage.node.put(node_copy)
-
-    def create_deployment_storage(self):
-        """
-        create deployment model & resource
-        """
-        assert self.deployment_id and self.deployment_plan
-
-        assert hasattr(self.resource_storage, 'blueprint')
-        assert hasattr(self.resource_storage, 'deployment')
-        assert hasattr(self.model_storage, 'deployment')
-
-        self.logger.debug('creating deployment resource storage entry')
-        temp_dir = tempfile.mkdtemp()
-        try:
-            self.resource_storage.service_template.download(
-                entry_id=self.blueprint_id,
-                destination=temp_dir)
-            self.resource_storage.service_instance.upload(
-                entry_id=self.deployment_id,
-                source=temp_dir)
-        finally:
-            shutil.rmtree(temp_dir, ignore_errors=True)
-        self.logger.debug('created deployment resource storage entry')
-
-        self.logger.debug('creating deployment model storage entry')
-        now = datetime.utcnow()
-        deployment = self.model_storage.service_instance.model_cls(
-            id=self.deployment_id,
-            blueprint_id=self.blueprint_id,
-            description=self.deployment_plan['description'],
-            workflows=self.deployment_plan['workflows'],
-            inputs=self.deployment_plan['inputs'],
-            policy_types=self.deployment_plan['policy_types'],
-            policy_triggers=self.deployment_plan['policy_triggers'],
-            groups=self.deployment_plan['groups'],
-            scaling_groups=self.deployment_plan['scaling_groups'],
-            outputs=self.deployment_plan['outputs'],
-            created_at=now,
-            updated_at=now
-        )
-        self.model_storage.service_instance.put(deployment)
-        self.logger.debug('created deployment model storage entry')
-
-    def create_node_instances_storage(self):
-        """
-        create node_instances model
-        """
-        assert self.deployment_id and self.deployment_plan
-        assert hasattr(self.model_storage, 'node_instance')
-        assert hasattr(self.model_storage, 'relationship_instance')
-
-        self.logger.debug('creating node-instances model storage entries')
-        for node_instance in self.deployment_plan['node_instances']:
-            node_model = self.model_storage.node.get(node_instance['node_id'])
-            relationship_instances = []
-
-            for index, relationship_instance in enumerate(node_instance['relationships']):
-                relationship_instance_model = self.model_storage.relationship.model_cls(
-                    relationship=node_model.relationships[index],
-                    target_name=relationship_instance['target_name'],
-                    type=relationship_instance['type'],
-                    target_id=relationship_instance['target_id'])
-                relationship_instances.append(relationship_instance_model)
-                self.model_storage.relationship.put(relationship_instance_model)
-
-            node_instance_model = self.model_storage.node.model_cls(
-                node=node_model,
-                id=node_instance['id'],
-                runtime_properties={},
-                state=self.model_storage.node.model_cls.UNINITIALIZED,
-                deployment_id=self.deployment_id,
-                version='1.0',
-                relationship_instances=relationship_instances)
-
-            self.model_storage.node.put(node_instance_model)
-        self.logger.debug('created node-instances model storage entries')
-
-    def create_plugin_storage(self, plugin_id, source):
-        """
-        create plugin model & resource
-        """
-        assert hasattr(self.model_storage, 'plugin')
-        assert hasattr(self.resource_storage, 'plugin')
-
-        self.logger.debug('creating plugin resource storage entry')
-        self.resource_storage.plugin.upload(entry_id=plugin_id, source=source)
-        self.logger.debug('created plugin resource storage entry')
-
-        self.logger.debug('creating plugin model storage entry')
-        plugin = _load_plugin_from_archive(source)
-        build_props = plugin.get('build_server_os_properties')
-        now = datetime.utcnow()
-
-        plugin = self.model_storage.plugin.model_cls(
-            id=plugin_id,
-            package_name=plugin.get('package_name'),
-            package_version=plugin.get('package_version'),
-            archive_name=plugin.get('archive_name'),
-            package_source=plugin.get('package_source'),
-            supported_platform=plugin.get('supported_platform'),
-            distribution=build_props.get('distribution'),
-            distribution_version=build_props.get('distribution_version'),
-            distribution_release=build_props.get('distribution_release'),
-            wheels=plugin.get('wheels'),
-            excluded_wheels=plugin.get('excluded_wheels'),
-            supported_py_versions=plugin.get('supported_python_versions'),
-            uploaded_at=now
-        )
-        self.model_storage.plugin.put(plugin)
-        self.logger.debug('created plugin model storage entry')
-
-
-def _load_plugin_from_archive(tar_source):
-    if not tarfile.is_tarfile(tar_source):
-        # TODO: go over the exceptions
-        raise StorageError(
-            'the provided tar archive can not be read.')
-
-    with tarfile.open(tar_source) as tar:
-        tar_members = tar.getmembers()
-        # a wheel plugin will contain exactly one sub directory
-        if not tar_members:
-            raise StorageError(
-                'archive file structure malformed. expecting exactly one '
-                'sub directory; got none.')
-        package_json_path = os.path.join(tar_members[0].name,
-                                         'package.json')
-        try:
-            package_member = tar.getmember(package_json_path)
-        except KeyError:
-            raise StorageError("'package.json' was not found under {0}"
-                               .format(package_json_path))
-        try:
-            package_json = tar.extractfile(package_member)
-        except tarfile.ExtractError as e:
-            raise StorageError(str(e))
-        try:
-            return json.load(package_json)
-        except ValueError as e:
-            raise StorageError("'package.json' is not a valid json: "
-                               "{json_str}. error is {error}"
-                               .format(json_str=package_json.read(), error=str(e)))

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/archive.py
----------------------------------------------------------------------
diff --git a/aria/utils/archive.py b/aria/utils/archive.py
new file mode 100644
index 0000000..63d9004
--- /dev/null
+++ b/aria/utils/archive.py
@@ -0,0 +1,63 @@
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+
+import os
+import tarfile
+import zipfile
+import tempfile
+from contextlib import closing
+
+
+def is_archive(source):
+    return tarfile.is_tarfile(source) or zipfile.is_zipfile(source)
+
+
+def extract_archive(source):
+    if tarfile.is_tarfile(source):
+        return untar(source)
+    elif zipfile.is_zipfile(source):
+        return unzip(source)
+    raise ValueError(
+        'Unsupported archive type provided or archive is not valid: {0}.'.format(source))
+
+
+def tar(source, destination):
+    with closing(tarfile.open(destination, 'w:gz')) as tar_archive:
+        tar_archive.add(source, arcname=os.path.basename(source))
+
+
+def untar(archive, destination=None):
+    if not destination:
+        destination = tempfile.mkdtemp()
+    with closing(tarfile.open(name=archive)) as tar_archive:
+        tar_archive.extractall(path=destination, members=tar_archive.getmembers())
+    return destination
+
+
+def zip(source, destination):
+    with closing(zipfile.ZipFile(destination, 'w')) as zip_file:
+        for root, _, files in os.walk(source):
+            for filename in files:
+                file_path = os.path.join(root, filename)
+                source_dir = os.path.dirname(source)
+                zip_file.write(
+                    file_path, os.path.relpath(file_path, source_dir))
+    return destination
+
+
+def unzip(archive, destination=None):
+    if not destination:
+        destination = tempfile.mkdtemp()
+    with closing(zipfile.ZipFile(archive, 'r')) as zip_file:
+        zip_file.extractall(destination)
+    return destination

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/exceptions.py
----------------------------------------------------------------------
diff --git a/aria/utils/exceptions.py b/aria/utils/exceptions.py
index 9e3e80f..b60cee4 100644
--- a/aria/utils/exceptions.py
+++ b/aria/utils/exceptions.py
@@ -15,6 +15,7 @@
 
 import sys
 import linecache
+import StringIO
 import traceback as tb
 
 import jsonpickle
@@ -89,6 +90,16 @@ def _print_stack(frame):
                 puts(line)
 
 
+def get_exception_as_string(exc_type, exc_val, traceback):
+    s_traceback = StringIO.StringIO()
+    tb.print_exception(
+        etype=exc_type,
+        value=exc_val,
+        tb=traceback,
+        file=s_traceback)
+    return s_traceback.getvalue()
+
+
 class _WrappedException(Exception):
 
     def __init__(self, exception_type, exception_str):

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/file.py
----------------------------------------------------------------------
diff --git a/aria/utils/file.py b/aria/utils/file.py
index b515f70..6d1aa16 100644
--- a/aria/utils/file.py
+++ b/aria/utils/file.py
@@ -15,6 +15,7 @@
 
 import errno
 import os
+import shutil
 
 
 def makedirs(path):
@@ -26,3 +27,15 @@ def makedirs(path):
     except IOError as e:
         if e.errno != errno.EEXIST:
             raise
+
+def remove_if_exists(path):
+
+    try:
+        if os.path.isfile(path):
+            os.remove(path)
+        if os.path.isdir(path):
+            shutil.rmtree(path)
+
+    except OSError as e:
+        if e.errno != errno.ENOENT:  # errno.ENOENT = no such file or directory
+            raise  # re-raise exception if a different error occurred

http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/8e5a1ec2/aria/utils/formatting.py
----------------------------------------------------------------------
diff --git a/aria/utils/formatting.py b/aria/utils/formatting.py
index 8a223e9..b5e141d 100644
--- a/aria/utils/formatting.py
+++ b/aria/utils/formatting.py
@@ -83,6 +83,34 @@ def full_type_name(value):
     return name if module == '__builtin__' else '%s.%s' % (module, name)
 
 
+def decode_list(data):
+    decoded_list = []
+    for item in data:
+        if isinstance(item, unicode):
+            item = item.encode('utf-8')
+        elif isinstance(item, list):
+            item = decode_list(item)
+        elif isinstance(item, dict):
+            item = decode_dict(item)
+        decoded_list.append(item)
+    return decoded_list
+
+
+def decode_dict(data):
+    decoded_dict = {}
+    for key, value in data.iteritems():
+        if isinstance(key, unicode):
+            key = key.encode('utf-8')
+        if isinstance(value, unicode):
+            value = value.encode('utf-8')
+        elif isinstance(value, list):
+            value = decode_list(value)
+        elif isinstance(value, dict):
+            value = decode_dict(value)
+        decoded_dict[key] = value
+    return decoded_dict
+
+
 def safe_str(value):
     """
     Like :code:`str` coercion, but makes sure that Unicode strings are properly



Mime
View raw message