Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4890A200C0B for ; Sun, 29 Jan 2017 16:12:00 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4719F160B36; Sun, 29 Jan 2017 15:12:00 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 4B97D160B4F for ; Sun, 29 Jan 2017 16:11:58 +0100 (CET) Received: (qmail 2158 invoked by uid 500); 29 Jan 2017 15:11:57 -0000 Mailing-List: contact dev-help@ariatosca.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@ariatosca.incubator.apache.org Delivered-To: mailing list dev@ariatosca.incubator.apache.org Received: (qmail 2147 invoked by uid 99); 29 Jan 2017 15:11:57 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 Jan 2017 15:11:57 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id E9B66C14FD for ; Sun, 29 Jan 2017 15:11:56 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -2.983 X-Spam-Level: X-Spam-Status: No, score=-2.983 tagged_above=-999 required=6.31 tests=[FILL_THIS_FORM_LOAN=2.237, HK_RANDOM_FROM=0.999, KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id DSnjFz9KyR2a for ; Sun, 29 Jan 2017 15:11:46 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D371A5FB47 for ; Sun, 29 Jan 2017 15:11:43 +0000 (UTC) Received: (qmail 1149 invoked by uid 99); 29 Jan 2017 15:11:43 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 29 Jan 2017 15:11:43 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id D2E58DFBDB; Sun, 29 Jan 2017 15:11:42 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: mxmrlv@apache.org To: dev@ariatosca.incubator.apache.org Date: Sun, 29 Jan 2017 15:11:46 -0000 Message-Id: <8666fa13f14e4a7687918fec9c8f28cc@git.apache.org> In-Reply-To: <6252d8e2d21240a9b45f660a76ecf688@git.apache.org> References: <6252d8e2d21240a9b45f660a76ecf688@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [5/5] incubator-ariatosca git commit: ARIA-44-Merge-parser-and-storage-models archived-at: Sun, 29 Jan 2017 15:12:00 -0000 ARIA-44-Merge-parser-and-storage-models Project: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/commit/68a81882 Tree: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/tree/68a81882 Diff: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/diff/68a81882 Branch: refs/heads/ARIA-44-Merge-parser-and-storage-models Commit: 68a81882307742417a41e786c10ceab5cb3370b6 Parents: 4447829 Author: mxmrlv Authored: Thu Jan 19 11:39:36 2017 +0200 Committer: mxmrlv Committed: Sun Jan 29 17:11:27 2017 +0200 ---------------------------------------------------------------------- aria/__init__.py | 49 +- aria/cli/commands.py | 2 +- aria/orchestrator/context/common.py | 22 +- aria/orchestrator/context/operation.py | 41 +- aria/orchestrator/context/serialization.py | 2 +- aria/orchestrator/context/toolbelt.py | 2 +- aria/orchestrator/context/workflow.py | 16 +- aria/orchestrator/runner.py | 10 +- aria/orchestrator/workflows/api/task.py | 120 +- aria/orchestrator/workflows/core/engine.py | 2 +- aria/orchestrator/workflows/core/task.py | 26 +- aria/orchestrator/workflows/executor/process.py | 17 +- aria/orchestrator/workflows/executor/thread.py | 2 +- aria/parser/modeling/__init__.py | 2 +- aria/parser/modeling/storage.py | 147 +- aria/storage/__init__.py | 11 +- aria/storage/base_model.py | 757 ---------- aria/storage/instrumentation.py | 4 +- aria/storage/model.py | 110 -- aria/storage/modeling/__init__.py | 37 + aria/storage/modeling/elements.py | 157 ++ aria/storage/modeling/instance_elements.py | 1247 ++++++++++++++++ aria/storage/modeling/model.py | 175 +++ aria/storage/modeling/orchestrator_elements.py | 461 ++++++ aria/storage/modeling/structure.py | 270 ++++ aria/storage/modeling/template_elements.py | 1352 ++++++++++++++++++ aria/storage/modeling/type.py | 275 ++++ aria/storage/modeling/utils.py | 146 ++ aria/storage/structure.py | 190 --- aria/storage/type.py | 120 -- aria/utils/application.py | 14 +- tests/mock/context.py | 4 +- tests/mock/models.py | 119 +- tests/mock/topology.py | 96 +- tests/orchestrator/context/test_operation.py | 119 +- .../context/test_resource_render.py | 2 +- tests/orchestrator/context/test_serialize.py | 17 +- tests/orchestrator/context/test_toolbelt.py | 72 +- tests/orchestrator/context/test_workflow.py | 14 +- .../orchestrator/execution_plugin/test_local.py | 61 +- tests/orchestrator/execution_plugin/test_ssh.py | 2 +- tests/orchestrator/test_runner.py | 9 +- tests/orchestrator/workflows/__init__.py | 2 +- tests/orchestrator/workflows/api/test_task.py | 76 +- .../workflows/builtin/test_execute_operation.py | 56 - .../workflows/builtin/test_install.py | 43 - .../workflows/builtin/test_uninstall.py | 44 - .../orchestrator/workflows/core/test_engine.py | 23 +- tests/orchestrator/workflows/core/test_task.py | 74 +- .../test_task_graph_into_exececution_graph.py | 14 +- .../workflows/executor/test_executor.py | 9 +- .../workflows/executor/test_process_executor.py | 10 +- .../executor/test_process_executor_extension.py | 13 +- .../test_process_executor_tracked_changes.py | 49 +- tests/resources/scripts/test_ssh.sh | 30 +- tests/storage/__init__.py | 12 +- tests/storage/test_instrumentation.py | 9 +- tests/storage/test_model_storage.py | 176 ++- tests/storage/test_models.py | 919 ------------ 59 files changed, 4877 insertions(+), 2983 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/__init__.py ---------------------------------------------------------------------- diff --git a/aria/__init__.py b/aria/__init__.py index 248aa1a..7f9fe8c 100644 --- a/aria/__init__.py +++ b/aria/__init__.py @@ -61,25 +61,46 @@ def application_model_storage(api, api_kwargs=None): """ Initiate model storage """ - models = [ - storage.model.Plugin, + models_to_register = [ + storage.modeling.model.Parameter, - storage.model.Blueprint, - storage.model.Deployment, - storage.model.DeploymentUpdate, - storage.model.DeploymentUpdateStep, - storage.model.DeploymentModification, + storage.modeling.model.MappingTemplate, + storage.modeling.model.SubstitutionTemplate, + storage.modeling.model.ServiceTemplate, + storage.modeling.model.NodeTemplate, + storage.modeling.model.GroupTemplate, + storage.modeling.model.InterfaceTemplate, + storage.modeling.model.OperationTemplate, + storage.modeling.model.ArtifactTemplate, + storage.modeling.model.PolicyTemplate, + storage.modeling.model.GroupPolicyTemplate, + storage.modeling.model.GroupPolicyTriggerTemplate, + storage.modeling.model.RequirementTemplate, + storage.modeling.model.CapabilityTemplate, - storage.model.Node, - storage.model.NodeInstance, - storage.model.Relationship, - storage.model.RelationshipInstance, + storage.modeling.model.Mapping, + storage.modeling.model.Substitution, + storage.modeling.model.ServiceInstance, + storage.modeling.model.Node, + storage.modeling.model.Group, + storage.modeling.model.Interface, + storage.modeling.model.Operation, + storage.modeling.model.Capability, + storage.modeling.model.Artifact, + storage.modeling.model.Policy, + storage.modeling.model.GroupPolicy, + storage.modeling.model.GroupPolicyTrigger, + storage.modeling.model.Relationship, - storage.model.Execution, - storage.model.Task, + storage.modeling.model.Execution, + storage.modeling.model.ServiceInstanceUpdate, + storage.modeling.model.ServiceInstanceUpdateStep, + storage.modeling.model.ServiceInstanceModification, + storage.modeling.model.Plugin, + storage.modeling.model.Task ] # if api not in _model_storage: - return storage.ModelStorage(api, items=models, api_kwargs=api_kwargs or {}) + return storage.ModelStorage(api, items=models_to_register, api_kwargs=api_kwargs or {}) def application_resource_storage(api, api_kwargs=None): http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/cli/commands.py ---------------------------------------------------------------------- diff --git a/aria/cli/commands.py b/aria/cli/commands.py index 0890cd1..45c866a 100644 --- a/aria/cli/commands.py +++ b/aria/cli/commands.py @@ -366,7 +366,7 @@ class ExecuteCommand(BaseCommand): FileSystemResourceDriver(local_resource_storage())) model_storage = application_model_storage( FileSystemModelDriver(local_model_storage())) - deployment = model_storage.deployment.get(args_namespace.deployment_id) + deployment = model_storage.service_instance.get(args_namespace.deployment_id) try: workflow = deployment.workflows[args_namespace.workflow_id] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/context/common.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/common.py b/aria/orchestrator/context/common.py index 6ab27ef..37482cf 100644 --- a/aria/orchestrator/context/common.py +++ b/aria/orchestrator/context/common.py @@ -31,7 +31,7 @@ class BaseContext(logger.LoggerMixin): def __init__( self, name, - deployment_id, + service_instance_id, model_storage, resource_storage, workdir=None, @@ -41,13 +41,13 @@ class BaseContext(logger.LoggerMixin): self._id = str(uuid4()) self._model = model_storage self._resource = resource_storage - self._deployment_id = deployment_id + self._service_instance_id = service_instance_id self._workdir = workdir def __repr__(self): return ( '{name}(name={self.name}, ' - 'deployment_id={self._deployment_id}, ' + 'deployment_id={self._service_instance_id}, ' .format(name=self.__class__.__name__, self=self)) @property @@ -67,18 +67,18 @@ class BaseContext(logger.LoggerMixin): return self._resource @property - def blueprint(self): + def service_template(self): """ The blueprint model """ - return self.deployment.blueprint + return self.service_instance.service_template @property - def deployment(self): + def service_instance(self): """ The deployment model """ - return self.model.deployment.get(self._deployment_id) + return self.model.service_instance.get(self._service_instance_id) @property def name(self): @@ -101,11 +101,11 @@ class BaseContext(logger.LoggerMixin): Download a blueprint resource from the resource storage """ try: - self.resource.deployment.download(entry_id=str(self.deployment.id), + self.resource.deployment.download(entry_id=str(self.service_instance.id), destination=destination, path=path) except exceptions.StorageError: - self.resource.blueprint.download(entry_id=str(self.blueprint.id), + self.resource.blueprint.download(entry_id=str(self.service_template.id), destination=destination, path=path) @@ -126,9 +126,9 @@ class BaseContext(logger.LoggerMixin): Read a deployment resource as string from the resource storage """ try: - return self.resource.deployment.read(entry_id=str(self.deployment.id), path=path) + return self.resource.deployment.read(entry_id=str(self.service_instance.id), path=path) except exceptions.StorageError: - return self.resource.blueprint.read(entry_id=str(self.blueprint.id), path=path) + return self.resource.deployment.read(entry_id=str(self.service_template.id), path=path) def get_resource_and_render(self, path=None, variables=None): """ http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/context/operation.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/operation.py b/aria/orchestrator/context/operation.py index 23a6fd4..75a6b7f 100644 --- a/aria/orchestrator/context/operation.py +++ b/aria/orchestrator/context/operation.py @@ -30,7 +30,7 @@ class BaseOperationContext(BaseContext): name, model_storage, resource_storage, - deployment_id, + service_instance_id, task_id, actor_id, **kwargs): @@ -38,7 +38,7 @@ class BaseOperationContext(BaseContext): name=name, model_storage=model_storage, resource_storage=resource_storage, - deployment_id=deployment_id, + service_instance_id=service_instance_id, **kwargs) self._task_id = task_id self._actor_id = actor_id @@ -68,7 +68,7 @@ class BaseOperationContext(BaseContext): if not self.task.plugin_name: return None plugin_workdir = '{0}/plugins/{1}/{2}'.format(self._workdir, - self.deployment.id, + self.service_instance.id, self.task.plugin_name) file.makedirs(plugin_workdir) return plugin_workdir @@ -79,20 +79,20 @@ class NodeOperationContext(BaseOperationContext): Context for node based operations. """ @property - def node(self): + def node_template(self): """ the node of the current operation :return: """ - return self.node_instance.node + return self.node.node_template @property - def node_instance(self): + def node(self): """ The node instance of the current operation :return: """ - return self.model.node_instance.get(self._actor_id) + return self.model.node.get(self._actor_id) class RelationshipOperationContext(BaseOperationContext): @@ -100,50 +100,41 @@ class RelationshipOperationContext(BaseOperationContext): Context for relationship based operations. """ @property - def source_node(self): + def source_node_template(self): """ The source node :return: """ - return self.relationship.source_node + return self.source_node.node_template @property - def source_node_instance(self): + def source_node(self): """ The source node instance :return: """ - return self.relationship_instance.source_node_instance + return self.relationship.source_node @property - def target_node(self): + def target_node_template(self): """ The target node :return: """ - return self.relationship.target_node + return self.target_node.node_template @property - def target_node_instance(self): + def target_node(self): """ The target node instance :return: """ - return self.relationship_instance.target_node_instance + return self.relationship.target_node @property def relationship(self): """ - The relationship of the current operation - :return: - """ - - return self.relationship_instance.relationship - - @property - def relationship_instance(self): - """ The relationship instance of the current operation :return: """ - return self.model.relationship_instance.get(self._actor_id) + return self.model.relationship.get(self._actor_id) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/context/serialization.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/serialization.py b/aria/orchestrator/context/serialization.py index 760818f..dd88041 100644 --- a/aria/orchestrator/context/serialization.py +++ b/aria/orchestrator/context/serialization.py @@ -23,7 +23,7 @@ def operation_context_to_dict(context): context_cls = context.__class__ context_dict = { 'name': context.name, - 'deployment_id': context._deployment_id, + 'service_instance_id': context._service_instance_id, 'task_id': context._task_id, 'actor_id': context._actor_id, 'workdir': context._workdir http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/context/toolbelt.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/toolbelt.py b/aria/orchestrator/context/toolbelt.py index 301b013..def7d42 100644 --- a/aria/orchestrator/context/toolbelt.py +++ b/aria/orchestrator/context/toolbelt.py @@ -33,7 +33,7 @@ class NodeToolBelt(object): :return: """ assert isinstance(self._op_context, operation.NodeOperationContext) - host = self._op_context.node_instance.host + host = self._op_context.node.host return host.runtime_properties.get('ip') http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/context/workflow.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/context/workflow.py b/aria/orchestrator/context/workflow.py index a15790e..4a8d94f 100644 --- a/aria/orchestrator/context/workflow.py +++ b/aria/orchestrator/context/workflow.py @@ -49,18 +49,16 @@ class WorkflowContext(BaseContext): def __repr__(self): return ( - '{name}(deployment_id={self._deployment_id}, ' + '{name}(deployment_id={self._service_instance_id}, ' 'workflow_name={self._workflow_name}'.format( name=self.__class__.__name__, self=self)) def _create_execution(self): - execution_cls = self.model.execution.model_cls now = datetime.utcnow() execution = self.model.execution.model_cls( - deployment=self.deployment, + service_instance=self.service_instance, workflow_name=self._workflow_name, created_at=now, - status=execution_cls.PENDING, parameters=self.parameters, ) self.model.execution.put(execution) @@ -81,27 +79,27 @@ class WorkflowContext(BaseContext): self.model.execution.put(value) @property - def nodes(self): + def node_templates(self): """ Iterator over nodes """ - key = 'deployment_{0}'.format(self.model.node.model_cls.name_column_name()) + key = 'deployment_{0}'.format(self.model.node_template.model_cls.name_column_name()) return self.model.node.iter( filters={ - key: getattr(self.deployment, self.deployment.name_column_name()) + key: getattr(self.service_instance, self.service_instance.node_template()) } ) @property - def node_instances(self): + def nodes(self): """ Iterator over node instances """ key = 'deployment_{0}'.format(self.model.node_instance.model_cls.name_column_name()) return self.model.node_instance.iter( filters={ - key: getattr(self.deployment, self.deployment.name_column_name()) + key: getattr(self.service_instance, self.service_instance.name_column_name()) } ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/runner.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/runner.py b/aria/orchestrator/runner.py index 16acc19..c0a303e 100644 --- a/aria/orchestrator/runner.py +++ b/aria/orchestrator/runner.py @@ -50,7 +50,7 @@ class Runner(object): """ def __init__(self, workflow_name, workflow_fn, inputs, initialize_model_storage_fn, - deployment_id, storage_path='', is_storage_temporary=True): + service_instance_id, storage_path='', is_storage_temporary=True): if storage_path == '': # Temporary file storage the_file, storage_path = tempfile.mkstemp(suffix='.db', prefix='aria-') @@ -59,7 +59,7 @@ class Runner(object): self._storage_path = storage_path self._is_storage_temporary = is_storage_temporary - workflow_context = self.create_workflow_context(workflow_name, deployment_id, + workflow_context = self.create_workflow_context(workflow_name, service_instance_id, initialize_model_storage_fn) tasks_graph = workflow_fn(ctx=workflow_context, **inputs) @@ -75,7 +75,7 @@ class Runner(object): finally: self.cleanup() - def create_workflow_context(self, workflow_name, deployment_id, initialize_model_storage_fn): + def create_workflow_context(self, workflow_name, service_instance_id, initialize_model_storage_fn): model_storage = self.create_sqlite_model_storage() initialize_model_storage_fn(model_storage) resource_storage = self.create_fs_resource_storage() @@ -83,7 +83,7 @@ class Runner(object): name=workflow_name, model_storage=model_storage, resource_storage=resource_storage, - deployment_id=deployment_id, + service_instance_id=service_instance_id, workflow_name=self.__class__.__name__, task_max_attempts=1, task_retry_interval=1) @@ -106,7 +106,7 @@ class Runner(object): 'sqlite:///%s%s' % (path_prefix, self._storage_path)) # Models - model.DeclarativeBase.metadata.create_all(bind=sqlite_engine) # @UndefinedVariable + model.DB.metadata.create_all(bind=sqlite_engine) # @UndefinedVariable # Session sqlite_session_factory = orm.sessionmaker(bind=sqlite_engine) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/workflows/api/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/api/task.py b/aria/orchestrator/workflows/api/task.py index 44715c1..546cfde 100644 --- a/aria/orchestrator/workflows/api/task.py +++ b/aria/orchestrator/workflows/api/task.py @@ -18,7 +18,7 @@ Provides the tasks to be entered into the task graph """ from uuid import uuid4 -from aria.storage import model +from aria.storage.modeling import model from ... import context from .. import exceptions @@ -57,13 +57,13 @@ class OperationTask(BaseTask): Represents an operation task in the task_graph """ - SOURCE_OPERATION = 'source_operations' - TARGET_OPERATION = 'target_operations' + SOURCE_OPERATION = 'source' + TARGET_OPERATION = 'target' def __init__(self, name, actor, - operation_mapping, + implementation, max_attempts=None, retry_interval=None, ignore_failure=None, @@ -76,12 +76,12 @@ class OperationTask(BaseTask): :param actor: the operation host on which this operation is registered. :param inputs: operation inputs. """ - assert isinstance(actor, (model.NodeInstance, - model.RelationshipInstance)) + assert isinstance(actor, (model.Node, + model.Relationship)) super(OperationTask, self).__init__() self.actor = actor self.name = '{name}.{actor.id}'.format(name=name, actor=actor) - self.operation_mapping = operation_mapping + self.implementation = implementation self.inputs = inputs or {} self.plugin = plugin or {} self.max_attempts = (self.workflow_context._task_max_attempts @@ -93,6 +93,12 @@ class OperationTask(BaseTask): self.runs_on = runs_on @classmethod + def _merge_inputs(cls, operation_inputs, additional_inputs=None): + final_inputs = dict((p.name, p.as_raw['value']) for p in operation_inputs) + final_inputs.update(additional_inputs or {}) + return final_inputs + + @classmethod def node_instance(cls, instance, name, inputs=None, *args, **kwargs): """ Represents a node based operation @@ -100,63 +106,97 @@ class OperationTask(BaseTask): :param instance: the node of which this operation belongs to. :param name: the name of the operation. """ - assert isinstance(instance, model.NodeInstance) - return cls._instance(instance=instance, - name=name, - operation_details=instance.node.operations[name], - inputs=inputs, - plugins=instance.node.plugins or [], - runs_on=model.Task.RUNS_ON_NODE_INSTANCE, - *args, - **kwargs) + assert isinstance(instance, model.Node) + interface_name = _get_interface_name(name) + interfaces = instance.interfaces.filter_by(name=interface_name) + if interfaces.count() > 1: + raise exceptions.TaskException( + "More than one interface with the same name {0} were found".format(name)) + elif interfaces.count() == 0: + raise exceptions.TaskException( + "No Interface with the name `{interface_name}` was found".format( + interface_name=interface_name) + ) + + operation_templates = interfaces[0].operations.filter_by(name=name) + if operation_templates.count() > 1: + raise exceptions.TaskException( + "More than one operation with the same name {0} were found".format(name)) + + elif operation_templates.count() == 0: + raise exceptions.TaskException( + "No Interface with the name `{operation_name}` was found".format( + operation_name=name) + ) + + return cls._instance( + instance=instance, + name=name, + operation_template=operation_templates[0], + plugins=instance.plugins or [], + runs_on=model.Task.RUNS_ON_NODE_INSTANCE, + inputs=cls._merge_inputs(operation_templates[0].inputs, inputs), + *args, + **kwargs) @classmethod - def relationship_instance(cls, instance, name, operation_end, inputs=None, *args, **kwargs): + def relationship_instance(cls, instance, name, inputs=None, *args, **kwargs): """ Represents a relationship based operation :param instance: the relationship of which this operation belongs to. :param name: the name of the operation. - :param operation_end: source or target end of the relationship, this corresponds directly - with 'source_operations' and 'target_operations' :param inputs any additional inputs to the operation """ - assert isinstance(instance, model.RelationshipInstance) - if operation_end not in [cls.TARGET_OPERATION, cls.SOURCE_OPERATION]: - raise exceptions.TaskException('The operation end should be {0} or {1}'.format( - cls.TARGET_OPERATION, cls.SOURCE_OPERATION - )) - operation_details = getattr(instance.relationship, operation_end)[name] + assert isinstance(instance, model.Relationship) + operation_name, operation_end = name.rsplit('_', 1) + interface_name = _get_interface_name(operation_name) + interfaces = getattr(instance, operation_end + '_interfaces').filter_by(name=interface_name) + if interfaces.count() > 1: + raise exceptions.TaskException( + "More than one interface with the same name {0} found".format(interface_name)) + elif interfaces.count() == 0: + raise exceptions.TaskException( + "No Interface with the name `{interface_name}` was found".format( + interface_name=interface_name) + ) + + operations = interfaces.all()[0].operations.filter_by(name=operation_name) + if operations.count() > 1: + raise exceptions.TaskException( + "More than one operation with the same name {0} found".format(name)) + elif operations.count() == 0: + raise exceptions.TaskException( + "No Operation with the name `{operation_name}` was found".format( + operation_name=operation_name) + ) + if operation_end == cls.SOURCE_OPERATION: - plugins = instance.relationship.source_node.plugins + plugins = instance.source_node.plugins runs_on = model.Task.RUNS_ON_SOURCE + else: - plugins = instance.relationship.target_node.plugins + plugins = instance.target_node.plugins runs_on = model.Task.RUNS_ON_TARGET return cls._instance(instance=instance, name=name, - operation_details=operation_details, - inputs=inputs, + operation_template=operations[0], plugins=plugins or [], runs_on=runs_on, + inputs=cls._merge_inputs(operations[0].inputs, inputs), *args, **kwargs) @classmethod - def _instance(cls, instance, name, operation_details, inputs, plugins, runs_on, *args, - **kwargs): - operation_mapping = operation_details.get('operation') - operation_inputs = operation_details.get('inputs', {}) - operation_inputs.update(inputs or {}) - plugin_name = operation_details.get('plugin') - matching_plugins = [p for p in plugins if p['name'] == plugin_name] + def _instance(cls, instance, name, operation_template, inputs, plugins, runs_on, *args, **kwargs): + matching_plugins = [p for p in plugins if p['name'] == operation_template.plugin] # All matching plugins should have identical package_name/package_version, so it's safe to # take the first found. plugin = matching_plugins[0] if matching_plugins else {} return cls(actor=instance, name=name, - operation_mapping=operation_mapping, - inputs=operation_inputs, + implementation=operation_template.implementation, + inputs=inputs, plugin=plugin, runs_on=runs_on, *args, @@ -197,3 +237,7 @@ class StubTask(BaseTask): Enables creating empty tasks. """ pass + + +def _get_interface_name(operation_name): + return operation_name.rsplit('.', 1)[0] http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/workflows/core/engine.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/engine.py b/aria/orchestrator/workflows/core/engine.py index fd83614..55b4159 100644 --- a/aria/orchestrator/workflows/core/engine.py +++ b/aria/orchestrator/workflows/core/engine.py @@ -23,7 +23,7 @@ from datetime import datetime import networkx from aria import logger -from aria.storage import model +from aria.storage.modeling import model from aria.orchestrator import events from .. import exceptions http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/workflows/core/task.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/core/task.py b/aria/orchestrator/workflows/core/task.py index f65fc0d..d0e1363 100644 --- a/aria/orchestrator/workflows/core/task.py +++ b/aria/orchestrator/workflows/core/task.py @@ -24,7 +24,7 @@ from functools import ( ) from aria import logger -from aria.storage import model +from aria.storage.modeling import model from aria.orchestrator.context import operation as operation_context from .. import exceptions @@ -109,11 +109,11 @@ class OperationTask(BaseTask): model_storage = api_task._workflow_context.model base_task_model = model_storage.task.model_cls - if isinstance(api_task.actor, model.NodeInstance): - context_class = operation_context.NodeOperationContext + if isinstance(api_task.actor, model.Node): + context_cls = operation_context.NodeOperationContext task_model_cls = base_task_model.as_node_instance - elif isinstance(api_task.actor, model.RelationshipInstance): - context_class = operation_context.RelationshipOperationContext + elif isinstance(api_task.actor, model.Relationship): + context_cls = operation_context.RelationshipOperationContext task_model_cls = base_task_model.as_relationship_instance else: raise RuntimeError('No operation context could be created for {actor.model_cls}' @@ -127,7 +127,7 @@ class OperationTask(BaseTask): # package_name and package_version operation_task = task_model_cls( name=api_task.name, - operation_mapping=api_task.operation_mapping, + implementation=api_task.implementation, instance=api_task.actor, inputs=api_task.inputs, status=base_task_model.PENDING, @@ -141,13 +141,13 @@ class OperationTask(BaseTask): ) self._workflow_context.model.task.put(operation_task) - self._ctx = context_class(name=api_task.name, - model_storage=self._workflow_context.model, - resource_storage=self._workflow_context.resource, - deployment_id=self._workflow_context._deployment_id, - task_id=operation_task.id, - actor_id=api_task.actor.id, - workdir=self._workflow_context._workdir) + self._ctx = context_cls(name=api_task.name, + model_storage=self._workflow_context.model, + resource_storage=self._workflow_context.resource, + service_instance_id=self._workflow_context._service_instance_id, + task_id=operation_task.id, + actor_id=api_task.actor.id, + workdir=self._workflow_context._workdir) self._task_id = operation_task.id self._update_fields = None http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/workflows/executor/process.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/process.py b/aria/orchestrator/workflows/executor/process.py index 7d990fa..6798854 100644 --- a/aria/orchestrator/workflows/executor/process.py +++ b/aria/orchestrator/workflows/executor/process.py @@ -19,8 +19,8 @@ Subprocess based executor # pylint: disable=wrong-import-position -import sys import os +import sys # As part of the process executor implementation, subprocess are started with this module as their # entry point. We thus remove this module's directory from the python path if it happens to be @@ -47,7 +47,7 @@ from aria.utils import exceptions from aria.orchestrator.workflows.executor import base from aria.orchestrator.context import serialization from aria.storage import instrumentation -from aria.storage import type as storage_type +from aria.storage.modeling import type as storage_type _IS_WIN = os.name == 'nt' @@ -190,7 +190,7 @@ class ProcessExecutor(base.BaseExecutor): def _create_arguments_dict(self, task): return { 'task_id': task.id, - 'operation_mapping': task.operation_mapping, + 'implementation': task.implementation, 'operation_inputs': task.inputs, 'port': self._server_port, 'context': serialization.operation_context_to_dict(task.context), @@ -281,9 +281,9 @@ def _patch_session(ctx, messenger, instrument): if not ctx.model: return - # We arbitrarily select the ``node_instance`` mapi to extract the session from it. + # We arbitrarily select the ``node`` mapi to extract the session from it. # could have been any other mapi just as well - session = ctx.model.node_instance._session + session = ctx.model.node._session original_refresh = session.refresh def patched_refresh(target): @@ -304,6 +304,9 @@ def _patch_session(ctx, messenger, instrument): def _main(): + # import pydevd + # pydevd.settrace('localhost', port=5678, stdoutToServer=True, stderrToServer=True, suspend=False) + arguments_json_path = sys.argv[1] with open(arguments_json_path) as f: arguments = jsonpickle.loads(f.read()) @@ -317,7 +320,7 @@ def _main(): messenger = _Messenger(task_id=task_id, port=port) messenger.started() - operation_mapping = arguments['operation_mapping'] + implementation = arguments['implementation'] operation_inputs = arguments['operation_inputs'] context_dict = arguments['context'] @@ -329,7 +332,7 @@ def _main(): try: ctx = serialization.operation_context_from_dict(context_dict) _patch_session(ctx=ctx, messenger=messenger, instrument=instrument) - task_func = imports.load_attribute(operation_mapping) + task_func = imports.load_attribute(implementation) aria.install_aria_extensions() for decorate in process_executor.decorate(): task_func = decorate(task_func) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/orchestrator/workflows/executor/thread.py ---------------------------------------------------------------------- diff --git a/aria/orchestrator/workflows/executor/thread.py b/aria/orchestrator/workflows/executor/thread.py index 1a6ad9f..7ae0217 100644 --- a/aria/orchestrator/workflows/executor/thread.py +++ b/aria/orchestrator/workflows/executor/thread.py @@ -57,7 +57,7 @@ class ThreadExecutor(BaseExecutor): task = self._queue.get(timeout=1) self._task_started(task) try: - task_func = imports.load_attribute(task.operation_mapping) + task_func = imports.load_attribute(task.implementation) task_func(ctx=task.context, **task.inputs) self._task_succeeded(task) except BaseException as e: http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/parser/modeling/__init__.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/__init__.py b/aria/parser/modeling/__init__.py index a1efd9f..2c4122d 100644 --- a/aria/parser/modeling/__init__.py +++ b/aria/parser/modeling/__init__.py @@ -24,7 +24,7 @@ from .model_elements import (ServiceModel, NodeTemplate, RequirementTemplate, Ca GroupPolicyTemplate, GroupPolicyTriggerTemplate, MappingTemplate, SubstitutionTemplate, InterfaceTemplate, OperationTemplate) from .types import TypeHierarchy, Type, RelationshipType, PolicyType, PolicyTriggerType -from .storage import initialize_storage +# from .storage import initialize_storage __all__ = ( 'CannotEvaluateFunctionException', http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/parser/modeling/storage.py ---------------------------------------------------------------------- diff --git a/aria/parser/modeling/storage.py b/aria/parser/modeling/storage.py index 46c3a7c..9c7412a 100644 --- a/aria/parser/modeling/storage.py +++ b/aria/parser/modeling/storage.py @@ -30,25 +30,19 @@ def initialize_storage(context, model_storage, deployment_id): model_storage.deployment.put(deployment) # Create nodes and node instances - for node_template in context.modeling.model.node_templates.itervalues(): - node = create_node(context, deployment, node_template) - model_storage.node.put(node) + for node_template in context.modeling.model.node_templates.values(): + model_storage.node_template.put(node_template) for a_node in context.modeling.instance.find_nodes(node_template.name): - node_instance = create_node_instance(node, a_node) - model_storage.node_instance.put(node_instance) + node = create_node_instance(deployment, node_template, a_node) + model_storage.node.put(node) # Create relationships - for node_template in context.modeling.model.node_templates.itervalues(): + for node_template in context.modeling.model.node_templates.values(): for index, requirement_template in enumerate(node_template.requirement_templates): # We are currently limited only to requirements for specific node templates! if requirement_template.target_node_template_name: - source = model_storage.node.get_by_name(node_template.name) - target = model_storage.node.get_by_name( - requirement_template.target_node_template_name) - relationship = create_relationship(context, source, target, - requirement_template.relationship_template) - model_storage.relationship.put(relationship) + model_storage.requirement_template.put(requirement_template) for node in context.modeling.instance.find_nodes(node_template.name): for relationship_model in node.relationships: @@ -58,10 +52,9 @@ def initialize_storage(context, model_storage, deployment_id): target_instance = \ model_storage.node_instance.get_by_name( relationship_model.target_node_id) - relationship_instance = \ - create_relationship_instance(relationship, source_instance, - target_instance) - model_storage.relationship_instance.put(relationship_instance) + relationship = \ + create_relationship_instance(source_instance, target_instance) + model_storage.relationship.put(relationship) def create_blueprint(context): @@ -71,131 +64,49 @@ def create_blueprint(context): name = context.modeling.model.metadata.values.get('template_name') except AttributeError: name = None - return model.Blueprint( + + return model.ServiceTemplate( plan={}, name=name or main_file_name, description=context.modeling.model.description or '', created_at=now, updated_at=now, - main_file_name=main_file_name) + main_file_name=main_file_name + ) -def create_deployment(context, blueprint, deployment_id): +def create_deployment(context, service_template, service_instance_id): now = datetime.utcnow() - return model.Deployment( - name='%s_%s' % (blueprint.name, deployment_id), - blueprint_fk=blueprint.id, + return model.ServiceInstance( + name='{0}_{1}' % (service_template.name, service_instance_id), + service_template=service_template, description=context.modeling.instance.description or '', created_at=now, updated_at=now, workflows={}, - inputs={}, - groups={}, permalink='', policy_triggers={}, - policy_types={}, - outputs={}, - scaling_groups={}) + scaling_groups={} + ) -def create_node(context, deployment, node_template): - operations = create_operations(context, node_template.interface_templates, '_dry_node') +def create_node_instance(service_instance, node, node_model): return model.Node( - name=node_template.name, - type=node_template.type_name, - type_hierarchy=[], - number_of_instances=node_template.default_instances, - planned_number_of_instances=node_template.default_instances, - deploy_number_of_instances=node_template.default_instances, - properties={}, - operations=operations, - min_number_of_instances=node_template.min_instances, - max_number_of_instances=node_template.max_instances or 100, - deployment_fk=deployment.id) - - -def create_relationship(context, source, target, relationship_template): - if relationship_template: - source_operations = create_operations(context, - relationship_template.source_interface_templates, - '_dry_relationship') - target_operations = create_operations(context, - relationship_template.target_interface_templates, - '_dry_relationship') - else: - source_operations = {} - target_operations = {} - return model.Relationship( - source_node_fk=source.id, - target_node_fk=target.id, - source_interfaces={}, - source_operations=source_operations, - target_interfaces={}, - target_operations=target_operations, - type='rel_type', - type_hierarchy=[], - properties={}) - - -def create_node_instance(node, node_model): - return model.NodeInstance( + service_instance=service_instance, name=node_model.id, runtime_properties={}, version=None, - node_fk=node.id, + node_template=node, state='', - scaling_groups=[]) - - -def create_relationship_instance(relationship, source_instance, target_instance): - return model.RelationshipInstance( - relationship_fk=relationship.id, - source_node_instance_fk=source_instance.id, - target_node_instance_fk=target_instance.id) - - -def create_operations(context, interfaces, fn_name): - operations = {} - for interface in interfaces.itervalues(): - operations[interface.type_name] = {} - for oper in interface.operation_templates.itervalues(): - name = '%s.%s' % (interface.type_name, oper.name) - operations[name] = { - 'operation': '%s.%s' % (__name__, fn_name), - 'inputs': { - '_plugin': None, - '_implementation': None}} - if oper.implementation: - plugin, implementation = _parse_implementation(context, oper.implementation) - operations[name]['inputs']['_plugin'] = plugin - operations[name]['inputs']['_implementation'] = implementation - - return operations + scaling_groups=[] + ) -def _parse_implementation(context, implementation): - index = implementation.find('>') - if index == -1: - return 'execution', implementation - plugin = implementation[:index].strip() - - # TODO: validation should happen in parser - if (plugin != 'execution') and (_get_plugin(context, plugin) is None): - raise ValueError('unknown plugin: "%s"' % plugin) - - implementation = implementation[index+1:].strip() - return plugin, implementation - - -def _get_plugin(context, plugin_name): - def is_plugin(type_name): - return context.modeling.policy_types.get_role(type_name) == 'plugin' - - for policy in context.modeling.instance.policies.itervalues(): - if (policy.name == plugin_name) and is_plugin(policy.type_name): - return policy - - return None +def create_relationship_instance(source_instance, target_instance): + return model.Relationship( + source_node=source_instance, + target_node=target_instance + ) _TERMINAL_LOCK = RLock() http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/storage/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/__init__.py b/aria/storage/__init__.py index a1c07d7..82bdc7e 100644 --- a/aria/storage/__init__.py +++ b/aria/storage/__init__.py @@ -42,14 +42,17 @@ from .core import ( ModelStorage, ResourceStorage, ) +from .modeling import ( + structure, + model, + model_base +) from . import ( exceptions, api, - structure, core, filesystem_rapi, sql_mapi, - model ) __all__ = ( @@ -60,5 +63,7 @@ __all__ = ( 'ResourceStorage', 'filesystem_rapi', 'sql_mapi', - 'api' + 'api', + 'model', + 'model_base', ) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/storage/base_model.py ---------------------------------------------------------------------- diff --git a/aria/storage/base_model.py b/aria/storage/base_model.py deleted file mode 100644 index f7d0e5b..0000000 --- a/aria/storage/base_model.py +++ /dev/null @@ -1,757 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's storage.models module -Path: aria.storage.models - -models module holds aria's models. - -classes: - * Field - represents a single field. - * IterField - represents an iterable field. - * Model - abstract model implementation. - * Snapshot - snapshots implementation model. - * Deployment - deployment implementation model. - * DeploymentUpdateStep - deployment update step implementation model. - * DeploymentUpdate - deployment update implementation model. - * DeploymentModification - deployment modification implementation model. - * Execution - execution implementation model. - * Node - node implementation model. - * Relationship - relationship implementation model. - * NodeInstance - node instance implementation model. - * RelationshipInstance - relationship instance implementation model. - * Plugin - plugin implementation model. -""" -from collections import namedtuple -from datetime import datetime - -from sqlalchemy.ext.associationproxy import association_proxy -from sqlalchemy.ext.declarative import declared_attr -from sqlalchemy import ( - Column, - Integer, - Text, - DateTime, - Boolean, - Enum, - String, - Float, - orm, -) -from sqlalchemy.ext.orderinglist import ordering_list - -from ..orchestrator.exceptions import TaskAbortException, TaskRetryException -from .structure import ModelMixin -from .type import ( - List, - Dict -) - -__all__ = ( - 'BlueprintBase', - 'DeploymentBase', - 'DeploymentUpdateStepBase', - 'DeploymentUpdateBase', - 'DeploymentModificationBase', - 'ExecutionBase', - 'NodeBase', - 'RelationshipBase', - 'NodeInstanceBase', - 'RelationshipInstanceBase', - 'PluginBase', - 'TaskBase' -) - -#pylint: disable=no-self-argument, abstract-method - - -class BlueprintBase(ModelMixin): - """ - Blueprint model representation. - """ - __tablename__ = 'blueprints' - - created_at = Column(DateTime, nullable=False, index=True) - main_file_name = Column(Text, nullable=False) - plan = Column(Dict, nullable=False) - updated_at = Column(DateTime) - description = Column(Text) - - -class DeploymentBase(ModelMixin): - """ - Deployment model representation. - """ - __tablename__ = 'deployments' - - _private_fields = ['blueprint_fk'] - - created_at = Column(DateTime, nullable=False, index=True) - description = Column(Text) - inputs = Column(Dict) - groups = Column(Dict) - permalink = Column(Text) - policy_triggers = Column(Dict) - policy_types = Column(Dict) - outputs = Column(Dict) - scaling_groups = Column(Dict) - updated_at = Column(DateTime) - workflows = Column(Dict) - - @declared_attr - def blueprint_fk(cls): - return cls.foreign_key(BlueprintBase, nullable=False) - - @declared_attr - def blueprint(cls): - return cls.one_to_many_relationship('blueprint_fk') - - @declared_attr - def blueprint_name(cls): - return association_proxy('blueprint', cls.name_column_name()) - - -class ExecutionBase(ModelMixin): - """ - Execution model representation. - """ - # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. - __tablename__ = 'executions' - _private_fields = ['deployment_fk'] - - TERMINATED = 'terminated' - FAILED = 'failed' - CANCELLED = 'cancelled' - PENDING = 'pending' - STARTED = 'started' - CANCELLING = 'cancelling' - FORCE_CANCELLING = 'force_cancelling' - - STATES = [TERMINATED, FAILED, CANCELLED, PENDING, STARTED, CANCELLING, FORCE_CANCELLING] - END_STATES = [TERMINATED, FAILED, CANCELLED] - ACTIVE_STATES = [state for state in STATES if state not in END_STATES] - - VALID_TRANSITIONS = { - PENDING: [STARTED, CANCELLED], - STARTED: END_STATES + [CANCELLING], - CANCELLING: END_STATES + [FORCE_CANCELLING] - } - - @orm.validates('status') - def validate_status(self, key, value): - """Validation function that verifies execution status transitions are OK""" - try: - current_status = getattr(self, key) - except AttributeError: - return - valid_transitions = self.VALID_TRANSITIONS.get(current_status, []) - if all([current_status is not None, - current_status != value, - value not in valid_transitions]): - raise ValueError('Cannot change execution status from {current} to {new}'.format( - current=current_status, - new=value)) - return value - - created_at = Column(DateTime, index=True) - started_at = Column(DateTime, nullable=True, index=True) - ended_at = Column(DateTime, nullable=True, index=True) - error = Column(Text, nullable=True) - is_system_workflow = Column(Boolean, nullable=False, default=False) - parameters = Column(Dict) - status = Column(Enum(*STATES, name='execution_status'), default=PENDING) - workflow_name = Column(Text) - - @declared_attr - def blueprint(cls): - return association_proxy('deployment', 'blueprint') - - @declared_attr - def deployment_fk(cls): - return cls.foreign_key(DeploymentBase, nullable=True) - - @declared_attr - def deployment(cls): - return cls.one_to_many_relationship('deployment_fk') - - @declared_attr - def deployment_name(cls): - return association_proxy('deployment', cls.name_column_name()) - - @declared_attr - def blueprint_name(cls): - return association_proxy('deployment', 'blueprint_name') - - def __str__(self): - return '<{0} id=`{1}` (status={2})>'.format( - self.__class__.__name__, - getattr(self, self.name_column_name()), - self.status - ) - - -class DeploymentUpdateBase(ModelMixin): - """ - Deployment update model representation. - """ - # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. - steps = None - - __tablename__ = 'deployment_updates' - - _private_fields = ['execution_fk', 'deployment_fk'] - - created_at = Column(DateTime, nullable=False, index=True) - deployment_plan = Column(Dict, nullable=False) - deployment_update_node_instances = Column(Dict) - deployment_update_deployment = Column(Dict) - deployment_update_nodes = Column(List) - modified_entity_ids = Column(Dict) - state = Column(Text) - - @declared_attr - def execution_fk(cls): - return cls.foreign_key(ExecutionBase, nullable=True) - - @declared_attr - def execution(cls): - return cls.one_to_many_relationship('execution_fk') - - @declared_attr - def execution_name(cls): - return association_proxy('execution', cls.name_column_name()) - - @declared_attr - def deployment_fk(cls): - return cls.foreign_key(DeploymentBase) - - @declared_attr - def deployment(cls): - return cls.one_to_many_relationship('deployment_fk') - - @declared_attr - def deployment_name(cls): - return association_proxy('deployment', cls.name_column_name()) - - def to_dict(self, suppress_error=False, **kwargs): - dep_update_dict = super(DeploymentUpdateBase, self).to_dict(suppress_error) #pylint: disable=no-member - # Taking care of the fact the DeploymentSteps are _BaseModels - dep_update_dict['steps'] = [step.to_dict() for step in self.steps] - return dep_update_dict - - -class DeploymentUpdateStepBase(ModelMixin): - """ - Deployment update step model representation. - """ - # Needed only for pylint. the id will be populated by sqlalcehmy and the proper column. - __tablename__ = 'deployment_update_steps' - _private_fields = ['deployment_update_fk'] - - _action_types = namedtuple('ACTION_TYPES', 'ADD, REMOVE, MODIFY') - ACTION_TYPES = _action_types(ADD='add', REMOVE='remove', MODIFY='modify') - _entity_types = namedtuple( - 'ENTITY_TYPES', - 'NODE, RELATIONSHIP, PROPERTY, OPERATION, WORKFLOW, OUTPUT, DESCRIPTION, GROUP, ' - 'POLICY_TYPE, POLICY_TRIGGER, PLUGIN') - ENTITY_TYPES = _entity_types( - NODE='node', - RELATIONSHIP='relationship', - PROPERTY='property', - OPERATION='operation', - WORKFLOW='workflow', - OUTPUT='output', - DESCRIPTION='description', - GROUP='group', - POLICY_TYPE='policy_type', - POLICY_TRIGGER='policy_trigger', - PLUGIN='plugin' - ) - - action = Column(Enum(*ACTION_TYPES, name='action_type'), nullable=False) - entity_id = Column(Text, nullable=False) - entity_type = Column(Enum(*ENTITY_TYPES, name='entity_type'), nullable=False) - - @declared_attr - def deployment_update_fk(cls): - return cls.foreign_key(DeploymentUpdateBase) - - @declared_attr - def deployment_update(cls): - return cls.one_to_many_relationship('deployment_update_fk', backreference='steps') - - @declared_attr - def deployment_update_name(cls): - return association_proxy('deployment_update', cls.name_column_name()) - - def __hash__(self): - return hash((getattr(self, self.id_column_name()), self.entity_id)) - - def __lt__(self, other): - """ - the order is 'remove' < 'modify' < 'add' - :param other: - :return: - """ - if not isinstance(other, self.__class__): - return not self >= other - - if self.action != other.action: - if self.action == 'remove': - return_value = True - elif self.action == 'add': - return_value = False - else: - return_value = other.action == 'add' - return return_value - - if self.action == 'add': - return self.entity_type == 'node' and other.entity_type == 'relationship' - if self.action == 'remove': - return self.entity_type == 'relationship' and other.entity_type == 'node' - return False - - -class DeploymentModificationBase(ModelMixin): - """ - Deployment modification model representation. - """ - __tablename__ = 'deployment_modifications' - _private_fields = ['deployment_fk'] - - STARTED = 'started' - FINISHED = 'finished' - ROLLEDBACK = 'rolledback' - - STATES = [STARTED, FINISHED, ROLLEDBACK] - END_STATES = [FINISHED, ROLLEDBACK] - - context = Column(Dict) - created_at = Column(DateTime, nullable=False, index=True) - ended_at = Column(DateTime, index=True) - modified_nodes = Column(Dict) - node_instances = Column(Dict) - status = Column(Enum(*STATES, name='deployment_modification_status')) - - @declared_attr - def deployment_fk(cls): - return cls.foreign_key(DeploymentBase) - - @declared_attr - def deployment(cls): - return cls.one_to_many_relationship('deployment_fk', backreference='modifications') - - @declared_attr - def deployment_name(cls): - return association_proxy('deployment', cls.name_column_name()) - - -class NodeBase(ModelMixin): - """ - Node model representation. - """ - __tablename__ = 'nodes' - - # See base class for an explanation on these properties - is_id_unique = False - - _private_fields = ['blueprint_fk', 'host_fk'] - - @declared_attr - def host_fk(cls): - return cls.foreign_key(NodeBase, nullable=True) - - @declared_attr - def host(cls): - return cls.relationship_to_self('host_fk') - - @declared_attr - def host_name(cls): - return association_proxy('host', cls.name_column_name()) - - @declared_attr - def deployment_fk(cls): - return cls.foreign_key(DeploymentBase) - - @declared_attr - def deployment(cls): - return cls.one_to_many_relationship('deployment_fk') - - @declared_attr - def deployment_name(cls): - return association_proxy('deployment', cls.name_column_name()) - - @declared_attr - def blueprint_name(cls): - return association_proxy('deployment', 'blueprint_{0}'.format(cls.name_column_name())) - - deploy_number_of_instances = Column(Integer, nullable=False) - max_number_of_instances = Column(Integer, nullable=False) - min_number_of_instances = Column(Integer, nullable=False) - number_of_instances = Column(Integer, nullable=False) - planned_number_of_instances = Column(Integer, nullable=False) - plugins = Column(List) - properties = Column(Dict) - operations = Column(Dict) - type = Column(Text, nullable=False, index=True) - type_hierarchy = Column(List) - - -class RelationshipBase(ModelMixin): - """ - Relationship model representation. - """ - __tablename__ = 'relationships' - - _private_fields = ['source_node_fk', 'target_node_fk', 'source_position', 'target_position'] - - source_position = Column(Integer) - target_position = Column(Integer) - - @declared_attr - def deployment_id(self): - return association_proxy('source_node', 'deployment_id') - - @declared_attr - def source_node_fk(cls): - return cls.foreign_key(NodeBase) - - @declared_attr - def source_node(cls): - return cls.one_to_many_relationship( - 'source_node_fk', - backreference='outbound_relationships', - backref_kwargs=dict( - order_by=cls.source_position, - collection_class=ordering_list('source_position', count_from=0) - ) - ) - - @declared_attr - def source_name(cls): - return association_proxy('source_node', cls.name_column_name()) - - @declared_attr - def target_node_fk(cls): - return cls.foreign_key(NodeBase, nullable=True) - - @declared_attr - def target_node(cls): - return cls.one_to_many_relationship( - 'target_node_fk', - backreference='inbound_relationships', - backref_kwargs=dict( - order_by=cls.target_position, - collection_class=ordering_list('target_position', count_from=0) - ) - ) - - @declared_attr - def target_name(cls): - return association_proxy('target_node', cls.name_column_name()) - - source_interfaces = Column(Dict) - source_operations = Column(Dict, nullable=False) - target_interfaces = Column(Dict) - target_operations = Column(Dict, nullable=False) - type = Column(String, nullable=False) - type_hierarchy = Column(List) - properties = Column(Dict) - - -class NodeInstanceBase(ModelMixin): - """ - Node instance model representation. - """ - __tablename__ = 'node_instances' - _private_fields = ['node_fk', 'host_fk'] - - runtime_properties = Column(Dict) - scaling_groups = Column(List) - state = Column(Text, nullable=False) - version = Column(Integer, default=1) - - @declared_attr - def host_fk(cls): - return cls.foreign_key(NodeInstanceBase, nullable=True) - - @declared_attr - def host(cls): - return cls.relationship_to_self('host_fk') - - @declared_attr - def host_name(cls): - return association_proxy('host', cls.name_column_name()) - - @declared_attr - def deployment(cls): - return association_proxy('node', 'deployment') - - @declared_attr - def deployment_name(cls): - return association_proxy('node', 'deployment_name') - - @declared_attr - def node_fk(cls): - return cls.foreign_key(NodeBase, nullable=True) - - @declared_attr - def node(cls): - return cls.one_to_many_relationship('node_fk') - - @declared_attr - def node_name(cls): - return association_proxy('node', cls.name_column_name()) - - @property - def ip(self): - if not self.host_fk: - return None - host_node_instance = self.host - if 'ip' in host_node_instance.runtime_properties: # pylint: disable=no-member - return host_node_instance.runtime_properties['ip'] # pylint: disable=no-member - host_node = host_node_instance.node # pylint: disable=no-member - if 'ip' in host_node.properties: - return host_node.properties['ip'] - return None - - -class RelationshipInstanceBase(ModelMixin): - """ - Relationship instance model representation. - """ - __tablename__ = 'relationship_instances' - _private_fields = ['relationship_storage_fk', - 'source_node_instance_fk', - 'target_node_instance_fk', - 'source_position', - 'target_position'] - - source_position = Column(Integer) - target_position = Column(Integer) - - @declared_attr - def source_node_instance_fk(cls): - return cls.foreign_key(NodeInstanceBase, nullable=True) - - @declared_attr - def source_node_instance(cls): - return cls.one_to_many_relationship( - 'source_node_instance_fk', - backreference='outbound_relationship_instances', - backref_kwargs=dict( - order_by=cls.source_position, - collection_class=ordering_list('source_position', count_from=0) - ) - ) - - @declared_attr - def source_node_instance_name(cls): - return association_proxy('source_node_instance', 'node_{0}'.format(cls.name_column_name())) - - @declared_attr - def source_node_name(cls): - return association_proxy('source_node_instance', cls.name_column_name()) - - @declared_attr - def target_node_instance_fk(cls): - return cls.foreign_key(NodeInstanceBase, nullable=True) - - @declared_attr - def target_node_instance(cls): - return cls.one_to_many_relationship( - 'target_node_instance_fk', - backreference='inbound_relationship_instances', - backref_kwargs=dict( - order_by=cls.target_position, - collection_class=ordering_list('target_position', count_from=0) - ) - ) - - @declared_attr - def target_node_instance_name(cls): - return association_proxy('target_node_instance', cls.name_column_name()) - - @declared_attr - def target_node_name(cls): - return association_proxy('target_node_instance', 'node_{0}'.format(cls.name_column_name())) - - @declared_attr - def relationship_fk(cls): - return cls.foreign_key(RelationshipBase) - - @declared_attr - def relationship(cls): - return cls.one_to_many_relationship('relationship_fk') - - @declared_attr - def relationship_name(cls): - return association_proxy('relationship', cls.name_column_name()) - - - -class PluginBase(ModelMixin): - """ - Plugin model representation. - """ - __tablename__ = 'plugins' - - archive_name = Column(Text, nullable=False, index=True) - distribution = Column(Text) - distribution_release = Column(Text) - distribution_version = Column(Text) - package_name = Column(Text, nullable=False, index=True) - package_source = Column(Text) - package_version = Column(Text) - supported_platform = Column(Text) - supported_py_versions = Column(List) - uploaded_at = Column(DateTime, nullable=False, index=True) - wheels = Column(List, nullable=False) - - -class TaskBase(ModelMixin): - """ - A Model which represents an task - """ - __tablename__ = 'tasks' - _private_fields = ['node_instance_fk', 'relationship_instance_fk', 'execution_fk'] - - @declared_attr - def node_instance_fk(cls): - return cls.foreign_key(NodeInstanceBase, nullable=True) - - @declared_attr - def node_instance_name(cls): - return association_proxy('node_instance', cls.name_column_name()) - - @declared_attr - def node_instance(cls): - return cls.one_to_many_relationship('node_instance_fk') - - @declared_attr - def relationship_instance_fk(cls): - return cls.foreign_key(RelationshipInstanceBase, nullable=True) - - @declared_attr - def relationship_instance_name(cls): - return association_proxy('relationship_instance', cls.name_column_name()) - - @declared_attr - def relationship_instance(cls): - return cls.one_to_many_relationship('relationship_instance_fk') - - @declared_attr - def plugin_fk(cls): - return cls.foreign_key(PluginBase, nullable=True) - - @declared_attr - def plugin(cls): - return cls.one_to_many_relationship('plugin_fk') - - @declared_attr - def execution_fk(cls): - return cls.foreign_key(ExecutionBase, nullable=True) - - @declared_attr - def execution(cls): - return cls.one_to_many_relationship('execution_fk') - - @declared_attr - def execution_name(cls): - return association_proxy('execution', cls.name_column_name()) - - PENDING = 'pending' - RETRYING = 'retrying' - SENT = 'sent' - STARTED = 'started' - SUCCESS = 'success' - FAILED = 'failed' - STATES = ( - PENDING, - RETRYING, - SENT, - STARTED, - SUCCESS, - FAILED, - ) - - WAIT_STATES = [PENDING, RETRYING] - END_STATES = [SUCCESS, FAILED] - - RUNS_ON_SOURCE = 'source' - RUNS_ON_TARGET = 'target' - RUNS_ON_NODE_INSTANCE = 'node_instance' - RUNS_ON = (RUNS_ON_NODE_INSTANCE, RUNS_ON_SOURCE, RUNS_ON_TARGET) - - @orm.validates('max_attempts') - def validate_max_attempts(self, _, value): # pylint: disable=no-self-use - """Validates that max attempts is either -1 or a positive number""" - if value < 1 and value != TaskBase.INFINITE_RETRIES: - raise ValueError('Max attempts can be either -1 (infinite) or any positive number. ' - 'Got {value}'.format(value=value)) - return value - - INFINITE_RETRIES = -1 - - status = Column(Enum(*STATES, name='status'), default=PENDING) - - due_at = Column(DateTime, default=datetime.utcnow) - started_at = Column(DateTime, default=None) - ended_at = Column(DateTime, default=None) - max_attempts = Column(Integer, default=1) - retry_count = Column(Integer, default=0) - retry_interval = Column(Float, default=0) - ignore_failure = Column(Boolean, default=False) - - # Operation specific fields - operation_mapping = Column(String) - inputs = Column(Dict) - plugin_name = Column(String) - _runs_on = Column(Enum(*RUNS_ON, name='runs_on'), name='runs_on') - - @property - def actor(self): - """ - Return the actor of the task - :return: - """ - return self.node_instance or self.relationship_instance - - @property - def runs_on(self): - if self._runs_on == self.RUNS_ON_NODE_INSTANCE: - return self.node_instance - elif self._runs_on == self.RUNS_ON_SOURCE: - return self.relationship_instance.source_node_instance # pylint: disable=no-member - elif self._runs_on == self.RUNS_ON_TARGET: - return self.relationship_instance.target_node_instance # pylint: disable=no-member - return None - - @classmethod - def as_node_instance(cls, instance, runs_on, **kwargs): - return cls(node_instance=instance, _runs_on=runs_on, **kwargs) - - @classmethod - def as_relationship_instance(cls, instance, runs_on, **kwargs): - return cls(relationship_instance=instance, _runs_on=runs_on, **kwargs) - - @staticmethod - def abort(message=None): - raise TaskAbortException(message) - - @staticmethod - def retry(message=None, retry_interval=None): - raise TaskRetryException(message, retry_interval=retry_interval) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/storage/instrumentation.py ---------------------------------------------------------------------- diff --git a/aria/storage/instrumentation.py b/aria/storage/instrumentation.py index 537dbb5..7bcf35a 100644 --- a/aria/storage/instrumentation.py +++ b/aria/storage/instrumentation.py @@ -18,11 +18,11 @@ import copy import sqlalchemy.event from . import api -from . import model as _model +from .modeling import model as _model _STUB = object() _INSTRUMENTED = { - _model.NodeInstance.runtime_properties: dict + _model.Node.runtime_properties: dict } http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/storage/model.py ---------------------------------------------------------------------- diff --git a/aria/storage/model.py b/aria/storage/model.py deleted file mode 100644 index afca3e4..0000000 --- a/aria/storage/model.py +++ /dev/null @@ -1,110 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Aria's storage.models module -Path: aria.storage.models - -models module holds aria's models. - -classes: - * Field - represents a single field. - * IterField - represents an iterable field. - * Model - abstract model implementation. - * Snapshot - snapshots implementation model. - * Deployment - deployment implementation model. - * DeploymentUpdateStep - deployment update step implementation model. - * DeploymentUpdate - deployment update implementation model. - * DeploymentModification - deployment modification implementation model. - * Execution - execution implementation model. - * Node - node implementation model. - * Relationship - relationship implementation model. - * NodeInstance - node instance implementation model. - * RelationshipInstance - relationship instance implementation model. - * ProviderContext - provider context implementation model. - * Plugin - plugin implementation model. -""" -from sqlalchemy.ext.declarative import declarative_base - -from . import structure -from . import base_model as base - -__all__ = ( - 'Blueprint', - 'Deployment', - 'DeploymentUpdateStep', - 'DeploymentUpdate', - 'DeploymentModification', - 'Execution', - 'Node', - 'Relationship', - 'NodeInstance', - 'RelationshipInstance', - 'Plugin', -) - - -#pylint: disable=abstract-method -# The required abstract method implementation are implemented in the ModelIDMixin, which is used as -# a base to the DeclerativeBase. -DeclarativeBase = declarative_base(cls=structure.ModelIDMixin) - - -class Blueprint(DeclarativeBase, base.BlueprintBase): - pass - - -class Deployment(DeclarativeBase, base.DeploymentBase): - pass - - -class Execution(DeclarativeBase, base.ExecutionBase): - pass - - -class DeploymentUpdate(DeclarativeBase, base.DeploymentUpdateBase): - pass - - -class DeploymentUpdateStep(DeclarativeBase, base.DeploymentUpdateStepBase): - pass - - -class DeploymentModification(DeclarativeBase, base.DeploymentModificationBase): - pass - - -class Node(DeclarativeBase, base.NodeBase): - pass - - -class Relationship(DeclarativeBase, base.RelationshipBase): - pass - - -class NodeInstance(DeclarativeBase, base.NodeInstanceBase): - pass - - -class RelationshipInstance(DeclarativeBase, base.RelationshipInstanceBase): - pass - - -class Plugin(DeclarativeBase, base.PluginBase): - pass - - -class Task(DeclarativeBase, base.TaskBase): - pass http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/storage/modeling/__init__.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/__init__.py b/aria/storage/modeling/__init__.py new file mode 100644 index 0000000..1d5936e --- /dev/null +++ b/aria/storage/modeling/__init__.py @@ -0,0 +1,37 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from collections import namedtuple + +from . import ( + model, + instance_elements as _instance_base, + orchestrator_elements as _orchestrator_base, + template_elements as _template_base, +) +from .model import DB as declarative_base + +_ModelBaseCls = namedtuple('ModelBase', 'instance_elements,' + 'orchestrator_elements,' + 'template_elements') +model_base = _ModelBaseCls(instance_elements=_instance_base, + orchestrator_elements=_orchestrator_base, + template_elements=_template_base) + +__all__ = ( + 'model', + 'model_base', + 'declarative_base' +) http://git-wip-us.apache.org/repos/asf/incubator-ariatosca/blob/68a81882/aria/storage/modeling/elements.py ---------------------------------------------------------------------- diff --git a/aria/storage/modeling/elements.py b/aria/storage/modeling/elements.py new file mode 100644 index 0000000..caceb78 --- /dev/null +++ b/aria/storage/modeling/elements.py @@ -0,0 +1,157 @@ +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from sqlalchemy import ( + Column, + Text, +) + +from aria.utils.collections import OrderedDict +from aria.utils.console import puts + +from .utils import coerce_value +from . import ( + structure, + type, +) + +# pylint: disable=no-self-argument, no-member, abstract-method + + +class Function(object): + """ + An intrinsic function. + + Serves as a placeholder for a value that should eventually be derived + by calling the function. + """ + + @property + def as_raw(self): + raise NotImplementedError + + def _evaluate(self, context, container): + raise NotImplementedError + + def __deepcopy__(self, memo): + # Circumvent cloning in order to maintain our state + return self + + +class ElementBase(object): + """ + Base class for :class:`ServiceInstance` elements. + + All elements support validation, diagnostic dumping, and representation as + raw data (which can be translated into JSON or YAML) via :code:`as_raw`. + """ + + @property + def as_raw(self): + raise NotImplementedError + + def validate(self, context): + pass + + def coerce_values(self, context, container, report_issues): + pass + + def dump(self, context): + pass + + +class ModelElementBase(ElementBase): + """ + Base class for :class:`ServiceModel` elements. + + All model elements can be instantiated into :class:`ServiceInstance` elements. + """ + + def instantiate(self, context, container): + raise NotImplementedError + + +class ParameterBase(ModelElementBase, structure.ModelMixin): + """ + Represents a typed value. + + This class is used by both service model and service instance elements. + """ + __tablename__ = 'parameter' + name = Column(Text, nullable=False) + type = Column(Text, nullable=False) + + # Check: value type + value = Column(Text) + description = Column(Text) + + @property + def as_raw(self): + return OrderedDict(( + ('name', self.name), + ('type_name', self.type), + ('value', self._coerce_value()), + ('description', self.description))) + + # TODO: change name + def _coerce_value(self): + if self.type is None: + return + + if self.type.lower() == 'str': + return str(self.value) + elif self.type.lower() == 'int': + return int(self.value) + elif self.type.lower() == 'bool': + return bool(self.value) + elif self.type.lower() == 'float': + return float(self.value) + else: + raise Exception('No supported type_name was provided') + + def instantiate(self, context, container): + return ParameterBase(self.type_name, self.value, self.description) + + def coerce_values(self, context, container, report_issues): + if self.value is not None: + self.value = coerce_value(context, container, self.value, report_issues) + + +class MetadataBase(ModelElementBase, structure.ModelMixin): + """ + Custom values associated with the deployment template and its plans. + + This class is used by both service model and service instance elements. + + Properties: + + * :code:`values`: Dict of custom values + """ + values = Column(type.StrictDict(key_cls=basestring)) + + @property + def as_raw(self): + return self.values + + def instantiate(self, context, container): + metadata = MetadataBase() + metadata.values.update(self.values) + return metadata + + def dump(self, context): + puts('Metadata:') + with context.style.indent: + for name, value in self.values.iteritems(): + puts('%s: %s' % (name, context.style.meta(value)))