airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] kaxil commented on a change in pull request #4167: [AIRFLOW-3220] Implement Instance Group Manager Operators for GCE
Date Sun, 11 Nov 2018 21:52:46 GMT
kaxil commented on a change in pull request #4167: [AIRFLOW-3220] Implement Instance Group
Manager Operators for GCE
URL: https://github.com/apache/incubator-airflow/pull/4167#discussion_r232505563
 
 

 ##########
 File path: airflow/contrib/operators/gcp_compute_operator.py
 ##########
 @@ -181,3 +195,241 @@ def execute(self, context):
         self._validate_all_body_fields()
         return self._hook.set_machine_type(self.project_id, self.zone,
                                            self.resource_id, self.body)
+
+
+GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION = [
+    dict(name="name", regexp="^.+$"),
+    dict(name="description", optional=True),
+    dict(name="properties", type='dict', optional=True, fields=[
+        dict(name="description", optional=True),
+        dict(name="tags", optional=True, fields=[
+            dict(name="items", optional=True)
+        ]),
+        dict(name="machineType", optional=True),
+        dict(name="canIpForward", optional=True),
+        dict(name="networkInterfaces", optional=True),  # not validating deeper
+        dict(name="disks", optional=True),  # not validating the array deeper
+        dict(name="metadata", optional=True, fields=[
+            dict(name="fingerprint", optional=True),
+            dict(name="items", optional=True),
+            dict(name="kind", optional=True),
+        ]),
+        dict(name="serviceAccounts", optional=True),  # not validating deeper
+        dict(name="scheduling", optional=True, fields=[
+            dict(name="onHostMaintenance", optional=True),
+            dict(name="automaticRestart", optional=True),
+            dict(name="preemptible", optional=True),
+            dict(name="nodeAffinitites", optional=True),  # not validating deeper
+        ]),
+        dict(name="labels", optional=True),
+        dict(name="guestAccelerators", optional=True),  # not validating deeper
+        dict(name="minCpuPlatform", optional=True),
+    ]),
+]
+
+GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE = [
+    "kind",
+    "id",
+    "name",
+    "creationTimestamp",
+    "properties.disks.sha256",
+    "properties.disks.kind",
+    "properties.disks.sourceImageEncryptionKey.sha256",
+    "properties.disks.index",
+    "properties.disks.licenses",
+    "properties.networkInterfaces.kind",
+    "properties.networkInterfaces.accessConfigs.kind",
+    "properties.networkInterfaces.name",
+    "properties.metadata.kind",
+    "selfLink"
+]
+
+
+class GceInstanceTemplateCopyOperator(GceBaseOperator):
+    """
+    Copies the instance template, applying specified changes.
+
+    :param project_id: Google Cloud Platform Project ID where the Compute Engine
+        instance exists.
+    :type project_id: str
+    :param resource_id: Name of the Instance Template
+    :type resource_id: str
+    :param body_patch: Patch to the body of instanceTemplates object following rfc7386
+            PATCH semantics. The body_patch content follows
+            https://cloud.google.com/compute/docs/reference/rest/v1/instanceTemplates
+            Name field is required as we need to rename the template,
+            all the other fields are optional. It is important to follow PATCH semantics
+            - arrays are replaced fully, so if you need to update an array you should
+            provide the whole target array as patch element.
+    :type body_patch: dict
+    :param request_id: Optional, unique request_id that you might add to achieve
+           full idempotence (for example when client call times out repeating the request
+           with the same request id will not create a new instance template again).
+           It should be in UUID format as defined in RFC 4122.
+    :type request_id: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (for example v1 or beta).
+    :type api_version: str
+    :param validate_body: If set to False, body validation is not performed.
+    :type validate_body: bool
+    """
+    # [START gce_instance_template_copy_operator_template_fields]
+    template_fields = ('project_id', 'resource_id', 'request_id',
+                       'gcp_conn_id', 'api_version')
+    # [END gce_instance_template_copy_operator_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 resource_id,
+                 body_patch,
+                 request_id=None,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.body_patch = body_patch
+        self.request_id = request_id
+        self._field_validator = None
+        if 'name' not in self.body_patch:
+            raise AirflowException("The body '{}' should contain at least "
+                                   "name for the new operator in the 'name' field".
+                                   format(body_patch))
+        if validate_body:
+            self._field_validator = GcpBodyFieldValidator(
+                GCE_INSTANCE_TEMPLATE_VALIDATION_PATCH_SPECIFICATION, api_version=api_version)
+        self._field_sanitizer = GcpBodyFieldSanitizer(
+            GCE_INSTANCE_TEMPLATE_FIELDS_TO_SANITIZE)
+        super(GceInstanceTemplateCopyOperator, self).__init__(
+            project_id=project_id, zone='global', resource_id=resource_id,
+            gcp_conn_id=gcp_conn_id, api_version=api_version, *args, **kwargs)
+
+    def _validate_all_body_fields(self):
+        if self._field_validator:
+            self._field_validator.validate(self.body_patch)
+
+    def execute(self, context):
+        self._validate_all_body_fields()
+        try:
+            # Idempotence check (sort of) - we want to check if the new template
+            # is already created and if is, then we assume it was created by previous run
+            # of CopyTemplate operator - we do not check if content of the template
+            # is as expected. Templates are immutable so we cannot update it anyway
+            # and deleting/recreating is not worth the hassle especially
+            # that we cannot delete template if it is already used in some Instance
+            # Group Manager. We assume success if the template is simply present
+            existing_template = self._hook.get_instance_template(
+                project_id=self.project_id, resource_id=self.body_patch['name'])
+            self.log.info("The {} template already existed. It was likely "
+                          "created by previous run of the operator. Assuming success.")
+            return existing_template
+        except HttpError as e:
+            # We actually expect to get 404 / Not Found here as the template should
+            # not yet exist
+            if not e.resp.status == 404:
+                raise e
+        old_body = self._hook.get_instance_template(project_id=self.project_id,
+                                                    resource_id=self.resource_id)
+        new_body = deepcopy(old_body)
+        self._field_sanitizer.sanitize(new_body)
+        new_body = merge(new_body, self.body_patch)
+        self.log.info("Calling insert instance template with updated body: {}".
+                      format(new_body))
+        self._hook.insert_instance_template(project_id=self.project_id,
+                                            body=new_body,
+                                            request_id=self.request_id)
+        return self._hook.get_instance_template(project_id=self.project_id,
+                                                resource_id=self.body_patch['name'])
+
+
+class GceInstanceGroupManagerUpdateTemplateOperator(GceBaseOperator):
+    """
+    Patches the Instance Group Manager, replacing source template URL with the
+    destination one. API V1 does not have update/patch operations for Instance
+    Group Manager, so you must use beta or newer API version. Beta is the default.
+
+    :param project_id: Google Cloud Platform Project ID where the Compute Engine
+        instance exists.
+    :type project_id: str
+    :param resource_id: Name of the Instance Group Manager
+    :type resource_id: str
+    :param zone: Google Cloud Platform zone where the Instance Group Manager exists.
+    :type zone: str
+    :param request_id: Optional, unique request_id that you might add to achieve
+           full idempotence (for example when client call times out repeating the request
+           with the same request id will not create a new instance template again).
 
 Review comment:
   Can we please make the indentation for all the docstrings in this PR consistent? Only one
indent or tab is required.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message