airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-3276) Google Cloud SQL database create / patch / delete operators
Date Fri, 02 Nov 2018 13:39:01 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-3276?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16673108#comment-16673108
] 

ASF GitHub Bot commented on AIRFLOW-3276:
-----------------------------------------

kaxil closed pull request #4124: [AIRFLOW-3276] Cloud SQL: database create / patch / delete
operators
URL: https://github.com/apache/incubator-airflow/pull/4124
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py
index a484456f6e..136c88c843 100644
--- a/airflow/contrib/example_dags/example_gcp_sql.py
+++ b/airflow/contrib/example_dags/example_gcp_sql.py
@@ -18,26 +18,30 @@
 # under the License.
 
 """
-Example Airflow DAG that deploys, updates, patches and deletes a Cloud SQL instance
-in Google Cloud Platform.
+Example Airflow DAG that creates, patches and deletes a Cloud SQL instance, and also
+creates, patches and deletes a database inside the instance, in Google Cloud Platform.
 
-This DAG relies on the following Airflow variables
-https://airflow.apache.org/concepts.html#variables
+This DAG relies on the following environment variables
 * PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance.
 * INSTANCE_NAME - Name of the Cloud SQL instance.
+* DB_NAME - Name of the database inside a Cloud SQL instance.
 """
 
+import os
 import datetime
 
 import airflow
 from airflow import models
 
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
-    CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator
+    CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
+    CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
+    CloudSqlInstanceDatabaseDeleteOperator
 
 # [START howto_operator_cloudsql_arguments]
-PROJECT_ID = models.Variable.get('PROJECT_ID', '')
-INSTANCE_NAME = models.Variable.get('INSTANCE_NAME', '')
+PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
+INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
+DB_NAME = os.environ.get('DB_NAME', 'testdb')
 # [END howto_operator_cloudsql_arguments]
 
 # Bodies below represent Cloud SQL instance resources:
@@ -97,6 +101,19 @@
     }
 }
 # [END howto_operator_cloudsql_patch_body]
+# [START howto_operator_cloudsql_db_create_body]
+db_create_body = {
+    "instance": INSTANCE_NAME,
+    "name": DB_NAME,
+    "project": PROJECT_ID
+}
+# [END howto_operator_cloudsql_db_create_body]
+# [START howto_operator_cloudsql_db_patch_body]
+db_patch_body = {
+    "charset": "utf16",
+    "collation": "utf16_general_ci"
+}
+# [END howto_operator_cloudsql_db_patch_body]
 
 default_args = {
     'start_date': airflow.utils.dates.days_ago(1)
@@ -123,6 +140,31 @@
         task_id='sql_instance_patch_task'
     )
     # [END howto_operator_cloudsql_patch]
+    # [START howto_operator_cloudsql_db_create]
+    sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
+        project_id=PROJECT_ID,
+        body=db_create_body,
+        instance=INSTANCE_NAME,
+        task_id='sql_db_create_task'
+    )
+    # [END howto_operator_cloudsql_db_create]
+    # [START howto_operator_cloudsql_db_patch]
+    sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
+        project_id=PROJECT_ID,
+        body=db_patch_body,
+        instance=INSTANCE_NAME,
+        database=DB_NAME,
+        task_id='sql_db_patch_task'
+    )
+    # [END howto_operator_cloudsql_db_patch]
+    # [START howto_operator_cloudsql_db_delete]
+    sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
+        project_id=PROJECT_ID,
+        instance=INSTANCE_NAME,
+        database=DB_NAME,
+        task_id='sql_db_delete_task'
+    )
+    # [END howto_operator_cloudsql_db_delete]
     # [START howto_operator_cloudsql_delete]
     sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
         project_id=PROJECT_ID,
@@ -131,4 +173,6 @@
     )
     # [END howto_operator_cloudsql_delete]
 
-    sql_instance_create_task >> sql_instance_patch_task >> sql_instance_delete_task
+    sql_instance_create_task >> sql_instance_patch_task \
+        >> sql_db_create_task >> sql_db_patch_task \
+        >> sql_db_delete_task >> sql_instance_delete_task
diff --git a/airflow/contrib/hooks/gcp_sql_hook.py b/airflow/contrib/hooks/gcp_sql_hook.py
index e0b3f92d8f..549ceaf49c 100644
--- a/airflow/contrib/hooks/gcp_sql_hook.py
+++ b/airflow/contrib/hooks/gcp_sql_hook.py
@@ -144,6 +144,96 @@ def delete_instance(self, project_id, instance):
         operation_name = response["name"]
         return self._wait_for_operation_to_complete(project_id, operation_name)
 
+    def get_database(self, project_id, instance, database):
+        """
+        Retrieves a database resource from a Cloud SQL instance.
+
+        :param project_id: Project ID of the project that contains the instance.
+        :type project_id: str
+        :param instance: Database instance ID. This does not include the project ID.
+        :type instance: str
+        :param database: Name of the database in the instance.
+        :type database: str
+        :return: A Cloud SQL database resource, as described in
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases#resource
+        :rtype: dict
+        """
+        return self.get_conn().databases().get(
+            project=project_id,
+            instance=instance,
+            database=database
+        ).execute(num_retries=NUM_RETRIES)
+
+    def create_database(self, project, instance, body):
+        """
+        Creates a new database inside a Cloud SQL instance.
+
+        :param project: Project ID of the project that contains the instance.
+        :type project: str
+        :param instance: Database instance ID. This does not include the project ID.
+        :type instance: str
+        :param body: The request body, as described in
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body
+        :type body: dict
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        response = self.get_conn().databases().insert(
+            project=project,
+            instance=instance,
+            body=body
+        ).execute(num_retries=NUM_RETRIES)
+        operation_name = response["name"]
+        return self._wait_for_operation_to_complete(project, operation_name)
+
+    def patch_database(self, project, instance, database, body):
+        """
+        Updates a database resource inside a Cloud SQL instance.
+        This method supports patch semantics.
+        See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
+
+        :param project: Project ID of the project that contains the instance.
+        :type project: str
+        :param instance: Database instance ID. This does not include the project ID.
+        :type instance: str
+        :param database: Name of the database to be updated in the instance.
+        :type database: str
+        :param body: The request body, as described in
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body
+        :type body: dict
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        response = self.get_conn().databases().patch(
+            project=project,
+            instance=instance,
+            database=database,
+            body=body
+        ).execute(num_retries=NUM_RETRIES)
+        operation_name = response["name"]
+        return self._wait_for_operation_to_complete(project, operation_name)
+
+    def delete_database(self, project, instance, database):
+        """
+        Deletes a database from a Cloud SQL instance.
+
+        :param project: Project ID of the project that contains the instance.
+        :type project: str
+        :param instance: Database instance ID. This does not include the project ID.
+        :type instance: str
+        :param database: Name of the database to be deleted in the instance.
+        :type database: str
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        response = self.get_conn().databases().delete(
+            project=project,
+            instance=instance,
+            database=database
+        ).execute(num_retries=NUM_RETRIES)
+        operation_name = response["name"]
+        return self._wait_for_operation_to_complete(project, operation_name)
+
     def _wait_for_operation_to_complete(self, project_id, operation_name):
         """
         Waits for the named operation to complete - checks status of the
diff --git a/airflow/contrib/operators/gcp_sql_operator.py b/airflow/contrib/operators/gcp_sql_operator.py
index 0ba7a300c9..aca2e8ad40 100644
--- a/airflow/contrib/operators/gcp_sql_operator.py
+++ b/airflow/contrib/operators/gcp_sql_operator.py
@@ -91,6 +91,45 @@
         ], optional=True),
     ], optional=True)
 ]
+CLOUD_SQL_EXPORT_VALIDATION = [
+    dict(name="exportContext", type="dict", fields=[
+        dict(name="fileType", allow_empty=False),
+        dict(name="uri", allow_empty=False),
+        dict(name="databases", type="list"),
+        dict(name="sqlExportOptions", type="dict", optional=True, fields=[
+            dict(name="tables", type="list"),
+            dict(name="schemaOnly")
+        ]),
+        dict(name="csvExportOptions", type="dict", optional=True, fields=[
+            dict(name="selectQuery")
+        ])
+    ])
+]
+CLOUD_SQL_IMPORT_VALIDATION = [
+    dict(name="importContext", type="dict", fields=[
+        dict(name="fileType", allow_empty=False),
+        dict(name="uri", allow_empty=False),
+        dict(name="database", optional=True, allow_empty=False),
+        dict(name="importUser", optional=True),
+        dict(name="csvImportOptions", type="dict", optional=True, fields=[
+            dict(name="table"),
+            dict(name="columns", type="list", optional=True)
+        ])
+    ])
+]
+CLOUD_SQL_DATABASE_INSERT_VALIDATION = [
+    dict(name="instance", allow_empty=False),
+    dict(name="name", allow_empty=False),
+    dict(name="project", allow_empty=False),
+]
+CLOUD_SQL_DATABASE_PATCH_VALIDATION = [
+    dict(name="instance", optional=True),
+    dict(name="name", optional=True),
+    dict(name="project", optional=True),
+    dict(name="etag", optional=True),
+    dict(name="charset", optional=True),
+    dict(name="collation", optional=True),
+]
 
 
 class CloudSqlBaseOperator(BaseOperator):
@@ -137,6 +176,15 @@ def _check_if_instance_exists(self, instance):
                 return False
             raise e
 
+    def _check_if_db_exists(self, db_name):
+        try:
+            return self._hook.get_database(self.project_id, self.instance, db_name)
+        except HttpError as e:
+            status = e.resp.status
+            if status == 404:
+                return False
+            raise e
+
     def execute(self, context):
         pass
 
@@ -162,7 +210,7 @@ class CloudSqlInstanceCreateOperator(CloudSqlBaseOperator):
     :type instance: str
     :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
     :type gcp_conn_id: str
-    :param api_version: API version used (e.g. v1).
+    :param api_version: API version used (e.g. v1beta4).
     :type api_version: str
     :param validate_body: True if body should be validated, False otherwise.
     :type validate_body: bool
@@ -226,7 +274,7 @@ class CloudSqlInstancePatchOperator(CloudSqlBaseOperator):
     :type instance: str
     :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
     :type gcp_conn_id: str
-    :param api_version: API version used (e.g. v1).
+    :param api_version: API version used (e.g. v1beta4).
     :type api_version: str
     """
     # [START gcp_sql_patch_template_fields]
@@ -270,7 +318,7 @@ class CloudSqlInstanceDeleteOperator(CloudSqlBaseOperator):
     :type instance: str
     :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
     :type gcp_conn_id: str
-    :param api_version: API version used (e.g. v1).
+    :param api_version: API version used (e.g. v1beta4).
     :type api_version: str
     """
     # [START gcp_sql_delete_template_fields]
@@ -295,3 +343,184 @@ def execute(self, context):
             return True
         else:
             return self._hook.delete_instance(self.project_id, self.instance)
+
+
+class CloudSqlInstanceDatabaseCreateOperator(CloudSqlBaseOperator):
+    """
+    Creates a new database inside a Cloud SQL instance.
+
+    :param project_id: Project ID of the project that contains the instance.
+    :type project_id: str
+    :param instance: Database instance ID. This does not include the project ID.
+    :type instance: str
+    :param body: The request body, as described in
+        https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body
+    :type body: dict
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1beta4).
+    :type api_version: str
+    :param validate_body: Whether the body should be validated. Defaults to True.
+    :type validate_body: bool
+    """
+    # [START gcp_sql_db_create_template_fields]
+    template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
+    # [END gcp_sql_db_create_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance,
+                 body,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.body = body
+        self.validate_body = validate_body
+        super(CloudSqlInstanceDatabaseCreateOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def _validate_inputs(self):
+        super(CloudSqlInstanceDatabaseCreateOperator, self)._validate_inputs()
+        if not self.body:
+            raise AirflowException("The required parameter 'body' is empty")
+
+    def _validate_body_fields(self):
+        if self.validate_body:
+            GcpBodyFieldValidator(CLOUD_SQL_DATABASE_INSERT_VALIDATION,
+                                  api_version=self.api_version).validate(self.body)
+
+    def execute(self, context):
+        self._validate_body_fields()
+        database = self.body.get("name")
+        if not database:
+            self.log.error("Body doesn't contain 'name'. Cannot check if the"
+                           " database already exists in the instance {}."
+                           .format(self.instance))
+            return False
+        if self._check_if_db_exists(database):
+            self.log.info("Cloud SQL instance with ID {} already contains database"
+                          " '{}'. Aborting database insert."
+                          .format(self.instance, database))
+            return True
+        else:
+            return self._hook.create_database(self.project_id, self.instance, self.body)
+
+
+class CloudSqlInstanceDatabasePatchOperator(CloudSqlBaseOperator):
+    """
+    Updates a resource containing information about a database inside a Cloud SQL
+    instance using patch semantics.
+    See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
+
+    :param project_id: Project ID of the project that contains the instance.
+    :type project_id: str
+    :param instance: Database instance ID. This does not include the project ID.
+    :type instance: str
+    :param database: Name of the database to be updated in the instance.
+    :type database: str
+    :param body: The request body, as described in
+        https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/patch#request-body
+    :type body: dict
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1beta4).
+    :type api_version: str
+    :param validate_body: Whether the body should be validated. Defaults to True.
+    :type validate_body: bool
+    """
+    # [START gcp_sql_db_patch_template_fields]
+    template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
+                       'api_version')
+    # [END gcp_sql_db_patch_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance,
+                 database,
+                 body,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.database = database
+        self.body = body
+        self.validate_body = validate_body
+        super(CloudSqlInstanceDatabasePatchOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def _validate_inputs(self):
+        super(CloudSqlInstanceDatabasePatchOperator, self)._validate_inputs()
+        if not self.body:
+            raise AirflowException("The required parameter 'body' is empty")
+        if not self.database:
+            raise AirflowException("The required parameter 'database' is empty")
+
+    def _validate_body_fields(self):
+        if self.validate_body:
+            GcpBodyFieldValidator(CLOUD_SQL_DATABASE_PATCH_VALIDATION,
+                                  api_version=self.api_version).validate(self.body)
+
+    def execute(self, context):
+        self._validate_body_fields()
+        if not self._check_if_db_exists(self.database):
+            raise AirflowException("Cloud SQL instance with ID {} does not contain "
+                                   "database '{}'. "
+                                   "Please specify another database to patch."
+                                   .format(self.instance, self.database))
+        else:
+            return self._hook.patch_database(self.project_id, self.instance,
+                                             self.database, self.body)
+
+
+class CloudSqlInstanceDatabaseDeleteOperator(CloudSqlBaseOperator):
+    """
+    Deletes a database from a Cloud SQL instance.
+
+    :param project_id: Project ID of the project that contains the instance.
+    :type project_id: str
+    :param instance: Database instance ID. This does not include the project ID.
+    :type instance: str
+    :param database: Name of the database to be deleted in the instance.
+    :type database: str
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1beta4).
+    :type api_version: str
+    """
+    # [START gcp_sql_db_delete_template_fields]
+    template_fields = ('project_id', 'instance', 'database', 'gcp_conn_id',
+                       'api_version')
+    # [END gcp_sql_db_delete_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance,
+                 database,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 *args, **kwargs):
+        self.database = database
+        super(CloudSqlInstanceDatabaseDeleteOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def _validate_inputs(self):
+        super(CloudSqlInstanceDatabaseDeleteOperator, self)._validate_inputs()
+        if not self.database:
+            raise AirflowException("The required parameter 'database' is empty")
+
+    def execute(self, context):
+        if not self._check_if_db_exists(self.database):
+            print("Cloud SQL instance with ID {} does not contain database '{}'. "
+                  "Aborting database delete."
+                  .format(self.instance, self.database))
+            return True
+        else:
+            return self._hook.delete_database(self.project_id, self.instance,
+                                              self.database)
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index 025274a5a4..6333e32dd7 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -284,6 +284,148 @@ If the source code for your function is in Google Source Repository,
make sure t
 your service account has the Source Repository Viewer role so that the source code
 can be downloaded if necessary.
 
+CloudSqlInstanceDatabaseCreateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Creates a new database inside a Cloud SQL instance.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseCreateOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from environment variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_db_create]
+    :end-before: [END howto_operator_cloudsql_db_create]
+
+Example request body:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_db_create_body]
+    :end-before: [END howto_operator_cloudsql_db_create_body]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+  :language: python
+  :dedent: 4
+  :start-after: [START gcp_sql_db_create_template_fields]
+  :end-before: [END gcp_sql_db_create_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for database insert
+<https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert>`_.
+
+CloudSqlInstanceDatabaseDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Deletes a database from a Cloud SQL instance.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseDeleteOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from environment variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_db_delete]
+    :end-before: [END howto_operator_cloudsql_db_delete]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+  :language: python
+  :dedent: 4
+  :start-after: [START gcp_sql_db_delete_template_fields]
+  :end-before: [END gcp_sql_db_delete_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for database delete
+<https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/delete>`_.
+
+CloudSqlInstanceDatabasePatchOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Updates a resource containing information about a database inside a Cloud SQL instance
+using patch semantics.
+See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabasePatchOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from environment variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_db_patch]
+    :end-before: [END howto_operator_cloudsql_db_patch]
+
+Example request body:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_db_patch_body]
+    :end-before: [END howto_operator_cloudsql_db_patch_body]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+  :language: python
+  :dedent: 4
+  :start-after: [START gcp_sql_db_patch_template_fields]
+  :end-before: [END gcp_sql_db_patch_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for database patch
+<https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/patch>`_.
+
 CloudSqlInstanceDeleteOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
@@ -295,7 +437,7 @@ For parameter definition take a look at
 Arguments
 """""""""
 
-Some arguments in the example DAG are taken from Airflow variables:
+Some arguments in the example DAG are taken from environment variables:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
     :language: python
@@ -342,7 +484,7 @@ will succeed.
 Arguments
 """""""""
 
-Some arguments in the example DAG are taken from Airflow variables:
+Some arguments in the example DAG are taken from environment variables:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
     :language: python
@@ -398,7 +540,7 @@ unchanged.
 Arguments
 """""""""
 
-Some arguments in the example DAG are taken from Airflow variables:
+Some arguments in the example DAG are taken from environment variables:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
     :language: python
diff --git a/docs/integration.rst b/docs/integration.rst
index 34e062437c..cb05e94169 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -515,10 +515,37 @@ Cloud SQL
 Cloud SQL Operators
 """""""""""""""""""
 
+- :ref:`CloudSqlInstanceDatabaseDeleteOperator` : deletes a database from a Cloud SQL
+instance.
+- :ref:`CloudSqlInstanceDatabaseCreateOperator` : creates a new database inside a Cloud
+SQL instance.
+- :ref:`CloudSqlInstanceDatabasePatchOperator` : updates a database inside a Cloud
+SQL instance.
 - :ref:`CloudSqlInstanceDeleteOperator` : delete a Cloud SQL instance.
 - :ref:`CloudSqlInstanceCreateOperator` : create a new Cloud SQL instance.
 - :ref:`CloudSqlInstancePatchOperator` : patch a Cloud SQL instance.
 
+.. CloudSqlInstanceDatabaseDeleteOperator:
+
+CloudSqlInstanceDatabaseDeleteOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseDeleteOperator
+
+.. CloudSqlInstanceDatabaseCreateOperator:
+
+CloudSqlInstanceDatabaseCreateOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseCreateOperator
+
+.. CloudSqlInstanceDatabasePatchOperator:
+
+CloudSqlInstanceDatabasePatchOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabasePatchOperator
+
 .. CloudSqlInstanceDeleteOperator:
 
 CloudSqlInstanceDeleteOperator
@@ -536,12 +563,12 @@ CloudSqlInstanceCreateOperator
 .. CloudSqlInstancePatchOperator:
 
 CloudSqlInstancePatchOperator
-^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstancePatchOperator
 
 Cloud SQL Hook
-""""""""""""""""""""
+""""""""""""""
 
 .. autoclass:: airflow.contrib.hooks.gcp_sql_hook.CloudSqlHook
     :members:
@@ -566,14 +593,14 @@ GceInstanceStartOperator
 .. _GceInstanceStopOperator:
 
 GceInstanceStopOperator
-^^^^^^^^^^^^^^^^^^^^^^^^
+^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceInstanceStopOperator
 
 .. _GceSetMachineTypeOperator:
 
 GceSetMachineTypeOperator
-^^^^^^^^^^^^^^^^^^^^^^^^
+^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: airflow.contrib.operators.gcp_compute_operator.GceSetMachineTypeOperator
 
diff --git a/tests/contrib/operators/test_gcp_sql_operator.py b/tests/contrib/operators/test_gcp_sql_operator.py
index 245631808a..31ed3d37c3 100644
--- a/tests/contrib/operators/test_gcp_sql_operator.py
+++ b/tests/contrib/operators/test_gcp_sql_operator.py
@@ -21,7 +21,9 @@
 
 from airflow import AirflowException
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
-    CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator
+    CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
+    CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
+    CloudSqlInstanceDatabaseDeleteOperator
 
 try:
     # noinspection PyProtectedMember
@@ -34,6 +36,7 @@
 
 PROJECT_ID = "project-id"
 INSTANCE_NAME = "test-name"
+DB_NAME = "db1"
 CREATE_BODY = {
     "name": INSTANCE_NAME,
     "settings": {
@@ -109,6 +112,21 @@
     },
     "region": "europe-west4"
 }
+DATABASE_INSERT_BODY = {
+    "name": DB_NAME,            # The name of the database in the Cloud SQL instance.
+                                # This does not include the project ID or instance name.
+
+    "project": PROJECT_ID,      # The project ID of the project containing the Cloud SQL
+                                # database. The Google apps domain is prefixed if
+                                # applicable.
+
+    "instance": INSTANCE_NAME,  # The name of the Cloud SQL instance.
+                                # This does not include the project ID.
+}
+DATABASE_PATCH_BODY = {
+    "charset": "utf16",
+    "collation": "utf16_general_ci"
+}
 
 
 class CloudSqlTest(unittest.TestCase):
@@ -312,3 +330,135 @@ def test_instance_delete_should_abort_and_succeed_if_not_exists(
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.delete_instance.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceDatabaseCreateOperator._check_if_db_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_db_create(self, mock_hook, _check_if_db_exists):
+        _check_if_db_exists.return_value = False
+        op = CloudSqlInstanceDatabaseCreateOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            body=DATABASE_INSERT_BODY,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.create_database.assert_called_once_with(
+            PROJECT_ID, INSTANCE_NAME, DATABASE_INSERT_BODY
+        )
+        self.assertTrue(result)
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceDatabaseCreateOperator._check_if_db_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_db_create_should_abort_and_succeed_if_exists(
+            self, mock_hook, _check_if_db_exists):
+        _check_if_db_exists.return_value = True
+        op = CloudSqlInstanceDatabaseCreateOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            body=DATABASE_INSERT_BODY,
+            task_id="id"
+        )
+        result = op.execute(None)
+        self.assertTrue(result)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.create_database.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceDatabasePatchOperator._check_if_db_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_db_patch(self, mock_hook, _check_if_db_exists):
+        _check_if_db_exists.return_value = True
+        op = CloudSqlInstanceDatabasePatchOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            database=DB_NAME,
+            body=DATABASE_PATCH_BODY,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.patch_database.assert_called_once_with(
+            PROJECT_ID, INSTANCE_NAME, DB_NAME, DATABASE_PATCH_BODY
+        )
+        self.assertTrue(result)
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceDatabasePatchOperator._check_if_db_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_db_patch_should_throw_ex_if_not_exists(
+            self, mock_hook, _check_if_db_exists):
+        _check_if_db_exists.return_value = False
+        with self.assertRaises(AirflowException) as cm:
+            op = CloudSqlInstanceDatabasePatchOperator(
+                project_id=PROJECT_ID,
+                instance=INSTANCE_NAME,
+                database=DB_NAME,
+                body=DATABASE_PATCH_BODY,
+                task_id="id"
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn("Cloud SQL instance with ID", str(err))
+        self.assertIn("does not contain database", str(err))
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.patch_database.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_db_patch_should_throw_ex_when_empty_database(self, mock_hook):
+        with self.assertRaises(AirflowException) as cm:
+            op = CloudSqlInstanceDatabasePatchOperator(
+                project_id=PROJECT_ID,
+                instance=INSTANCE_NAME,
+                database="",
+                body=DATABASE_INSERT_BODY,
+                task_id="id"
+            )
+            op.execute(None)
+        err = cm.exception
+        self.assertIn("The required parameter 'database' is empty", str(err))
+        mock_hook.assert_not_called()
+        mock_hook.return_value.patch_database.assert_not_called()
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceDatabaseDeleteOperator._check_if_db_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_db_delete(self, mock_hook, _check_if_db_exists):
+        _check_if_db_exists.return_value = True
+        op = CloudSqlInstanceDatabaseDeleteOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            database=DB_NAME,
+            task_id="id"
+        )
+        result = op.execute(None)
+        self.assertTrue(result)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.delete_database.assert_called_once_with(
+            PROJECT_ID, INSTANCE_NAME, DB_NAME
+        )
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator"
+                ".CloudSqlInstanceDatabaseDeleteOperator._check_if_db_exists")
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_db_delete_should_abort_and_succeed_if_not_exists(
+            self, mock_hook, _check_if_db_exists):
+        _check_if_db_exists.return_value = False
+        op = CloudSqlInstanceDatabaseDeleteOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            database=DB_NAME,
+            task_id="id"
+        )
+        result = op.execute(None)
+        self.assertTrue(result)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.delete_database.assert_not_called()


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Google Cloud SQL database create / patch / delete operators
> -----------------------------------------------------------
>
>                 Key: AIRFLOW-3276
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3276
>             Project: Apache Airflow
>          Issue Type: New Feature
>            Reporter: Szymon Przedwojski
>            Assignee: Szymon Przedwojski
>            Priority: Minor
>
> Operators allowing to invoke Google Cloud SQL's database methods:
> - CloudSqlInstanceDatabaseCreateOperator ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert])
> - CloudSqlInstanceDatabasePatchOperator ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/patch])
> - CloudSqlInstanceDatabaseDeleteOperator ([https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/delete])



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Mime
View raw message