airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From GitBox <...@apache.org>
Subject [GitHub] kaxil closed pull request #4251: [AIRFLOW-2440] Add Google Cloud SQL import/export operator
Date Wed, 05 Dec 2018 20:33:03 GMT
kaxil closed pull request #4251: [AIRFLOW-2440] Add Google Cloud SQL import/export operator
URL: https://github.com/apache/incubator-airflow/pull/4251
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py
index 45c0895b0f..c6838a2baf 100644
--- a/airflow/contrib/example_dags/example_gcp_sql.py
+++ b/airflow/contrib/example_dags/example_gcp_sql.py
@@ -29,22 +29,32 @@
 """
 
 import os
-import datetime
+
+import re
 
 import airflow
 from airflow import models
-
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
     CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
     CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
-    CloudSqlInstanceDatabaseDeleteOperator
+    CloudSqlInstanceDatabaseDeleteOperator, CloudSqlInstanceExportOperator, \
+    CloudSqlInstanceImportOperator
+from airflow.contrib.operators.gcs_acl_operator import \
+    GoogleCloudStorageBucketCreateAclEntryOperator, \
+    GoogleCloudStorageObjectCreateAclEntryOperator
 
 # [START howto_operator_cloudsql_arguments]
 PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testpostgres')
+INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'test-mysql')
+INSTANCE_NAME2 = os.environ.get('INSTANCE_NAME2', 'test-mysql2')
 DB_NAME = os.environ.get('DB_NAME', 'testdb')
 # [END howto_operator_cloudsql_arguments]
 
+# [START howto_operator_cloudsql_export_import_arguments]
+EXPORT_URI = os.environ.get('EXPORT_URI', 'gs://bucketName/fileName')
+IMPORT_URI = os.environ.get('IMPORT_URI', 'gs://bucketName/fileName')
+# [END howto_operator_cloudsql_export_import_arguments]
+
 # Bodies below represent Cloud SQL instance resources:
 # https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances
 
@@ -86,6 +96,16 @@
     "region": "europe-west4",
 }
 # [END howto_operator_cloudsql_create_body]
+
+body2 = {
+    "name": INSTANCE_NAME2,
+    "settings": {
+        "tier": "db-n1-standard-1",
+    },
+    "databaseVersion": "MYSQL_5_7",
+    "region": "europe-west4",
+}
+
 # [START howto_operator_cloudsql_patch_body]
 patch_body = {
     "name": INSTANCE_NAME,
@@ -102,6 +122,25 @@
     }
 }
 # [END howto_operator_cloudsql_patch_body]
+# [START howto_operator_cloudsql_export_body]
+export_body = {
+    "exportContext": {
+        "fileType": "sql",
+        "uri": EXPORT_URI,
+        "sqlExportOptions": {
+            "schemaOnly": False
+        }
+    }
+}
+# [END howto_operator_cloudsql_export_body]
+# [START howto_operator_cloudsql_import_body]
+import_body = {
+    "importContext": {
+        "fileType": "sql",
+        "uri": IMPORT_URI
+    }
+}
+# [END howto_operator_cloudsql_import_body]
 # [START howto_operator_cloudsql_db_create_body]
 db_create_body = {
     "instance": INSTANCE_NAME,
@@ -123,16 +162,40 @@
 with models.DAG(
     'example_gcp_sql',
     default_args=default_args,
-    schedule_interval=datetime.timedelta(days=1)
+    schedule_interval=None
 ) as dag:
+    prev_task = None
+
+    def next_dep(task, prev):
+        prev >> task
+        return task
+
+    # ############################################## #
+    # ### INSTANCES SET UP ######################### #
+    # ############################################## #
+
     # [START howto_operator_cloudsql_create]
-    sql_instance_create_task = CloudSqlInstanceCreateOperator(
+    sql_instance_create = CloudSqlInstanceCreateOperator(
         project_id=PROJECT_ID,
         body=body,
         instance=INSTANCE_NAME,
-        task_id='sql_instance_create_task'
+        task_id='sql_instance_create'
     )
     # [END howto_operator_cloudsql_create]
+    prev_task = sql_instance_create
+
+    sql_instance_create_2 = CloudSqlInstanceCreateOperator(
+        project_id=PROJECT_ID,
+        body=body2,
+        instance=INSTANCE_NAME2,
+        task_id='sql_instance_create_2'
+    )
+    prev_task = next_dep(sql_instance_create_2, prev_task)
+
+    # ############################################## #
+    # ### MODIFYING INSTANCE AND ITS DATABASE ###### #
+    # ############################################## #
+
     # [START howto_operator_cloudsql_patch]
     sql_instance_patch_task = CloudSqlInstancePatchOperator(
         project_id=PROJECT_ID,
@@ -141,6 +204,8 @@
         task_id='sql_instance_patch_task'
     )
     # [END howto_operator_cloudsql_patch]
+    prev_task = next_dep(sql_instance_patch_task, prev_task)
+
     # [START howto_operator_cloudsql_db_create]
     sql_db_create_task = CloudSqlInstanceDatabaseCreateOperator(
         project_id=PROJECT_ID,
@@ -149,6 +214,8 @@
         task_id='sql_db_create_task'
     )
     # [END howto_operator_cloudsql_db_create]
+    prev_task = next_dep(sql_db_create_task, prev_task)
+
     # [START howto_operator_cloudsql_db_patch]
     sql_db_patch_task = CloudSqlInstanceDatabasePatchOperator(
         project_id=PROJECT_ID,
@@ -158,6 +225,65 @@
         task_id='sql_db_patch_task'
     )
     # [END howto_operator_cloudsql_db_patch]
+    prev_task = next_dep(sql_db_patch_task, prev_task)
+
+    # ############################################## #
+    # ### EXPORTING SQL FROM INSTANCE 1 ############ #
+    # ############################################## #
+
+    # For export to work we need to add the Cloud SQL instance's Service Account
+    # write access to the destination GCS bucket.
+    # [START howto_operator_cloudsql_export_gcs_permissions]
+    sql_gcp_add_bucket_permission = GoogleCloudStorageBucketCreateAclEntryOperator(
+        entity="user-{{ task_instance.xcom_pull('sql_instance_create', key='service_account_email')
}}",
+        role="WRITER",
+        bucket=re.match(r'gs:\/\/(\S*)\/', EXPORT_URI).group(1),
+        task_id='sql_gcp_add_bucket_permission'
+    )
+    # [END howto_operator_cloudsql_export_gcs_permissions]
+    prev_task = next_dep(sql_gcp_add_bucket_permission, prev_task)
+
+    # [START howto_operator_cloudsql_export]
+    sql_export_task = CloudSqlInstanceExportOperator(
+        project_id=PROJECT_ID,
+        body=export_body,
+        instance=INSTANCE_NAME,
+        task_id='sql_export_task'
+    )
+    # [END howto_operator_cloudsql_export]
+    prev_task = next_dep(sql_export_task, prev_task)
+
+    # ############################################## #
+    # ### IMPORTING SQL TO INSTANCE 2 ############## #
+    # ############################################## #
+
+    # For import to work we need to add the Cloud SQL instance's Service Account
+    # read access to the target GCS object.
+    # [START howto_operator_cloudsql_import_gcs_permissions]
+    sql_gcp_add_object_permission = GoogleCloudStorageObjectCreateAclEntryOperator(
+        entity="user-{{ task_instance.xcom_pull('sql_instance_create_2', key='service_account_email')
}}",
+        role="READER",
+        bucket=re.match(r'gs:\/\/(\S*)\/', IMPORT_URI).group(1),
+        object_name=re.match(r'gs:\/\/[^\/]*\/(\S*)', IMPORT_URI).group(1),
+        task_id='sql_gcp_add_object_permission',
+    )
+    # [END howto_operator_cloudsql_import_gcs_permissions]
+    prev_task = next_dep(sql_gcp_add_object_permission, prev_task)
+
+    # [START howto_operator_cloudsql_import]
+    sql_import_task = CloudSqlInstanceImportOperator(
+        project_id=PROJECT_ID,
+        body=import_body,
+        instance=INSTANCE_NAME2,
+        task_id='sql_import_task'
+    )
+    # [END howto_operator_cloudsql_import]
+    prev_task = next_dep(sql_import_task, prev_task)
+
+    # ############################################## #
+    # ### DELETING A DATABASE FROM AN INSTANCE ##### #
+    # ############################################## #
+
     # [START howto_operator_cloudsql_db_delete]
     sql_db_delete_task = CloudSqlInstanceDatabaseDeleteOperator(
         project_id=PROJECT_ID,
@@ -166,6 +292,12 @@
         task_id='sql_db_delete_task'
     )
     # [END howto_operator_cloudsql_db_delete]
+    prev_task = next_dep(sql_db_delete_task, prev_task)
+
+    # ############################################## #
+    # ### INSTANCES TEAR DOWN ###################### #
+    # ############################################## #
+
     # [START howto_operator_cloudsql_delete]
     sql_instance_delete_task = CloudSqlInstanceDeleteOperator(
         project_id=PROJECT_ID,
@@ -173,7 +305,11 @@
         task_id='sql_instance_delete_task'
     )
     # [END howto_operator_cloudsql_delete]
+    prev_task = next_dep(sql_instance_delete_task, prev_task)
 
-    sql_instance_create_task >> sql_instance_patch_task \
-        >> sql_db_create_task >> sql_db_patch_task \
-        >> sql_db_delete_task >> sql_instance_delete_task
+    sql_instance_delete_task_2 = CloudSqlInstanceDeleteOperator(
+        project_id=PROJECT_ID,
+        instance=INSTANCE_NAME2,
+        task_id='sql_instance_delete_task_2'
+    )
+    prev_task = next_dep(sql_instance_delete_task_2, prev_task)
diff --git a/airflow/contrib/hooks/gcp_sql_hook.py b/airflow/contrib/hooks/gcp_sql_hook.py
index aee0098b07..4e87f2305b 100644
--- a/airflow/contrib/hooks/gcp_sql_hook.py
+++ b/airflow/contrib/hooks/gcp_sql_hook.py
@@ -27,6 +27,7 @@
 import time
 import uuid
 from os.path import isfile
+from googleapiclient import errors
 from subprocess import Popen, PIPE
 from six.moves.urllib.parse import quote_plus
 
@@ -254,6 +255,64 @@ def delete_database(self, project, instance, database):
         operation_name = response["name"]
         return self._wait_for_operation_to_complete(project, operation_name)
 
+    def export_instance(self, project_id, instance_id, body):
+        """
+        Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump
+        or CSV file.
+
+        :param project_id: Project ID of the project where the instance exists.
+        :type project_id: str
+        :param instance_id: Name of the Cloud SQL instance. This does not include the
+            project ID.
+        :type instance_id: str
+        :param body: The request body, as described in
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
+        :type body: dict
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        try:
+            response = self.get_conn().instances().export(
+                project=project_id,
+                instance=instance_id,
+                body=body
+            ).execute(num_retries=NUM_RETRIES)
+            operation_name = response["name"]
+            return self._wait_for_operation_to_complete(project_id, operation_name)
+        except errors.HttpError as ex:
+            raise AirflowException(
+                'Exporting instance {} failed: {}'.format(instance_id, ex.content)
+            )
+
+    def import_instance(self, project_id, instance_id, body):
+        """
+        Imports data into a Cloud SQL instance from a SQL dump or CSV file in
+        Cloud Storage.
+
+        :param project_id: Project ID of the project where the instance exists.
+        :type project_id: str
+        :param instance_id: Name of the Cloud SQL instance. This does not include the
+            project ID.
+        :type instance_id: str
+        :param body: The request body, as described in
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
+        :type body: dict
+        :return: True if the operation succeeded, raises an error otherwise
+        :rtype: bool
+        """
+        try:
+            response = self.get_conn().instances().import_(
+                project=project_id,
+                instance=instance_id,
+                body=body
+            ).execute(num_retries=NUM_RETRIES)
+            operation_name = response["name"]
+            return self._wait_for_operation_to_complete(project_id, operation_name)
+        except errors.HttpError as ex:
+            raise AirflowException(
+                'Importing instance {} failed: {}'.format(instance_id, ex.content)
+            )
+
     def _wait_for_operation_to_complete(self, project_id, operation_name):
         """
         Waits for the named operation to complete - checks status of the
diff --git a/airflow/contrib/hooks/gcs_hook.py b/airflow/contrib/hooks/gcs_hook.py
index f848d25dce..6499ea38b5 100644
--- a/airflow/contrib/hooks/gcs_hook.py
+++ b/airflow/contrib/hooks/gcs_hook.py
@@ -577,14 +577,9 @@ def insert_bucket_acl(self, bucket, entity, role, user_project):
         :param bucket: Name of a bucket.
         :type bucket: str
         :param entity: The entity holding the permission, in one of the following forms:
-        - user-userId
-        - user-email
-        - group-groupId
-        - group-email
-        - domain-domain
-        - project-team-projectId
-        - allUsers
-        - allAuthenticatedUsers
+            user-userId, user-email, group-groupId, group-email, domain-domain,
+            project-team-projectId, allUsers, allAuthenticatedUsers.
+            See: https://cloud.google.com/storage/docs/access-control/lists#scopes
         :type entity: str
         :param role: The access permission for the entity.
             Acceptable values are: "OWNER", "READER", "WRITER".
@@ -625,14 +620,9 @@ def insert_object_acl(self, bucket, object_name, entity, role, generation,
             https://cloud.google.com/storage/docs/json_api/#encoding
         :type object_name: str
         :param entity: The entity holding the permission, in one of the following forms:
-            - user-userId
-            - user-email
-            - group-groupId
-            - group-email
-            - domain-domain
-            - project-team-projectId
-            - allUsers
-            - allAuthenticatedUsers
+            user-userId, user-email, group-groupId, group-email, domain-domain,
+            project-team-projectId, allUsers, allAuthenticatedUsers
+            See: https://cloud.google.com/storage/docs/access-control/lists#scopes
         :type entity: str
         :param role: The access permission for the entity.
             Acceptable values are: "OWNER", "READER".
diff --git a/airflow/contrib/operators/gcp_sql_operator.py b/airflow/contrib/operators/gcp_sql_operator.py
index 711f2c8d15..abdefb5190 100644
--- a/airflow/contrib/operators/gcp_sql_operator.py
+++ b/airflow/contrib/operators/gcp_sql_operator.py
@@ -27,7 +27,7 @@
 SETTINGS = 'settings'
 SETTINGS_VERSION = 'settingsVersion'
 
-CLOUD_SQL_VALIDATION = [
+CLOUD_SQL_CREATE_VALIDATION = [
     dict(name="name", allow_empty=False),
     dict(name="settings", type="dict", fields=[
         dict(name="tier", allow_empty=False),
@@ -95,10 +95,10 @@
     dict(name="exportContext", type="dict", fields=[
         dict(name="fileType", allow_empty=False),
         dict(name="uri", allow_empty=False),
-        dict(name="databases", type="list"),
+        dict(name="databases", optional=True, type="list"),
         dict(name="sqlExportOptions", type="dict", optional=True, fields=[
-            dict(name="tables", type="list"),
-            dict(name="schemaOnly")
+            dict(name="tables", optional=True, type="list"),
+            dict(name="schemaOnly", optional=True)
         ]),
         dict(name="csvExportOptions", type="dict", optional=True, fields=[
             dict(name="selectQuery")
@@ -117,7 +117,7 @@
         ])
     ])
 ]
-CLOUD_SQL_DATABASE_INSERT_VALIDATION = [
+CLOUD_SQL_DATABASE_CREATE_VALIDATION = [
     dict(name="instance", allow_empty=False),
     dict(name="name", allow_empty=False),
     dict(name="project", allow_empty=False),
@@ -142,7 +142,7 @@ class CloudSqlBaseOperator(BaseOperator):
     :type instance: str
     :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
     :type gcp_conn_id: str
-    :param api_version: API version used (e.g. v1).
+    :param api_version: API version used (e.g. v1beta4).
     :type api_version: str
     """
     @apply_defaults
@@ -241,17 +241,21 @@ def _validate_inputs(self):
 
     def _validate_body_fields(self):
         if self.validate_body:
-            GcpBodyFieldValidator(CLOUD_SQL_VALIDATION,
+            GcpBodyFieldValidator(CLOUD_SQL_CREATE_VALIDATION,
                                   api_version=self.api_version).validate(self.body)
 
     def execute(self, context):
         self._validate_body_fields()
         if not self._check_if_instance_exists(self.instance):
-            return self._hook.create_instance(self.project_id, self.body)
+            self._hook.create_instance(self.project_id, self.body)
         else:
             self.log.info("Cloud SQL instance with ID {} already exists. "
                           "Aborting create.".format(self.instance))
-            return True
+
+        instance_resource = self._hook.get_instance(self.project_id, self.instance)
+        service_account_email = instance_resource["serviceAccountEmailAddress"]
+        task_instance = context['task_instance']
+        task_instance.xcom_push(key="service_account_email", value=service_account_email)
 
 
 class CloudSqlInstancePatchOperator(CloudSqlBaseOperator):
@@ -389,7 +393,7 @@ def _validate_inputs(self):
 
     def _validate_body_fields(self):
         if self.validate_body:
-            GcpBodyFieldValidator(CLOUD_SQL_DATABASE_INSERT_VALIDATION,
+            GcpBodyFieldValidator(CLOUD_SQL_DATABASE_CREATE_VALIDATION,
                                   api_version=self.api_version).validate(self.body)
 
     def execute(self, context):
@@ -526,6 +530,131 @@ def execute(self, context):
                                               self.database)
 
 
+class CloudSqlInstanceExportOperator(CloudSqlBaseOperator):
+    """
+    Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump
+    or CSV file.
+
+    Note: This operator is idempotent. If executed multiple times with the same
+    export file URI, the export file in GCS will simply be overridden.
+
+    :param project_id: Project ID of the project that contains the instance to be
+        exported.
+    :type project_id: str
+    :param instance: Cloud SQL instance ID. This does not include the project ID.
+    :type instance: str
+    :param body: The request body, as described in
+        https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
+    :type body: dict
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1beta4).
+    :type api_version: str
+    :param validate_body: Whether the body should be validated. Defaults to True.
+    :type validate_body: bool
+    """
+    # [START gcp_sql_export_template_fields]
+    template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
+    # [END gcp_sql_export_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance,
+                 body,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.body = body
+        self.validate_body = validate_body
+        super(CloudSqlInstanceExportOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def _validate_inputs(self):
+        super(CloudSqlInstanceExportOperator, self)._validate_inputs()
+        if not self.body:
+            raise AirflowException("The required parameter 'body' is empty")
+
+    def _validate_body_fields(self):
+        if self.validate_body:
+            GcpBodyFieldValidator(CLOUD_SQL_EXPORT_VALIDATION,
+                                  api_version=self.api_version).validate(self.body)
+
+    def execute(self, context):
+        self._validate_body_fields()
+        return self._hook.export_instance(self.project_id, self.instance, self.body)
+
+
+class CloudSqlInstanceImportOperator(CloudSqlBaseOperator):
+    """
+    Imports data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage.
+
+    CSV IMPORT:
+
+    This operator is NOT idempotent for a CSV import. If the same file is imported
+    multiple times, the imported data will be duplicated in the database.
+    Moreover, if there are any unique constraints the duplicate import may result in an
+    error.
+
+    SQL IMPORT:
+
+    This operator is idempotent for a SQL import if it was also exported by Cloud SQL.
+    The exported SQL contains 'DROP TABLE IF EXISTS' statements for all tables
+    to be imported.
+
+    If the import file was generated in a different way, idempotence is not guaranteed.
+    It has to be ensured on the SQL file level.
+
+    :param project_id: Project ID of the project that contains the instance.
+    :type project_id: str
+    :param instance: Cloud SQL instance ID. This does not include the project ID.
+    :type instance: str
+    :param body: The request body, as described in
+        https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/export#request-body
+    :type body: dict
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform.
+    :type gcp_conn_id: str
+    :param api_version: API version used (e.g. v1beta4).
+    :type api_version: str
+    :param validate_body: Whether the body should be validated. Defaults to True.
+    :type validate_body: bool
+    """
+    # [START gcp_sql_import_template_fields]
+    template_fields = ('project_id', 'instance', 'gcp_conn_id', 'api_version')
+    # [END gcp_sql_import_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 project_id,
+                 instance,
+                 body,
+                 gcp_conn_id='google_cloud_default',
+                 api_version='v1beta4',
+                 validate_body=True,
+                 *args, **kwargs):
+        self.body = body
+        self.validate_body = validate_body
+        super(CloudSqlInstanceImportOperator, self).__init__(
+            project_id=project_id, instance=instance, gcp_conn_id=gcp_conn_id,
+            api_version=api_version, *args, **kwargs)
+
+    def _validate_inputs(self):
+        super(CloudSqlInstanceImportOperator, self)._validate_inputs()
+        if not self.body:
+            raise AirflowException("The required parameter 'body' is empty")
+
+    def _validate_body_fields(self):
+        if self.validate_body:
+            GcpBodyFieldValidator(CLOUD_SQL_IMPORT_VALIDATION,
+                                  api_version=self.api_version).validate(self.body)
+
+    def execute(self, context):
+        self._validate_body_fields()
+        return self._hook.import_instance(self.project_id, self.instance, self.body)
+
+
 class CloudSqlQueryOperator(BaseOperator):
     """
     Performs DML or DDL query on an existing Cloud Sql instance. It optionally uses
diff --git a/airflow/contrib/operators/gcs_acl_operator.py b/airflow/contrib/operators/gcs_acl_operator.py
index d918444131..a39b8cf5d9 100644
--- a/airflow/contrib/operators/gcs_acl_operator.py
+++ b/airflow/contrib/operators/gcs_acl_operator.py
@@ -29,14 +29,8 @@ class GoogleCloudStorageBucketCreateAclEntryOperator(BaseOperator):
     :param bucket: Name of a bucket.
     :type bucket: str
     :param entity: The entity holding the permission, in one of the following forms:
-        - user-userId
-        - user-email
-        - group-groupId
-        - group-email
-        - domain-domain
-        - project-team-projectId
-        - allUsers
-        - allAuthenticatedUsers
+        user-userId, user-email, group-groupId, group-email, domain-domain,
+        project-team-projectId, allUsers, allAuthenticatedUsers
     :type entity: str
     :param role: The access permission for the entity.
         Acceptable values are: "OWNER", "READER", "WRITER".
@@ -82,14 +76,8 @@ class GoogleCloudStorageObjectCreateAclEntryOperator(BaseOperator):
         https://cloud.google.com/storage/docs/json_api/#encoding
     :type object_name: str
     :param entity: The entity holding the permission, in one of the following forms:
-        - user-userId
-        - user-email
-        - group-groupId
-        - group-email
-        - domain-domain
-        - project-team-projectId
-        - allUsers
-        - allAuthenticatedUsers
+        user-userId, user-email, group-groupId, group-email, domain-domain,
+        project-team-projectId, allUsers, allAuthenticatedUsers
     :type entity: str
     :param role: The access permission for the entity.
         Acceptable values are: "OWNER", "READER".
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index dc0f4c99d3..9502736737 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -719,6 +719,178 @@ More information
 See `Google Cloud SQL API documentation for delete
 <https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/delete>`_.
 
+.. CloudSqlInstanceExportOperator:
+
+CloudSqlInstanceExportOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Exports data from a Cloud SQL instance to a Cloud Storage bucket as a SQL dump
+or CSV file.
+
+Note: This operator is idempotent. If executed multiple times with the same
+export file URI, the export file in GCS will simply be overridden.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceExportOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from Airflow variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_export_import_arguments]
+    :end-before: [END howto_operator_cloudsql_export_import_arguments]
+
+Example body defining the export operation:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_export_body]
+    :end-before: [END howto_operator_cloudsql_export_body]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_export]
+    :end-before: [END howto_operator_cloudsql_export]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gcp_sql_export_template_fields]
+    :end-before: [END gcp_sql_export_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for export <https://cloud.google
+.com/sql/docs/mysql/admin-api/v1beta4/instances/export>`_.
+
+Troubleshooting
+"""""""""""""""
+
+If you receive an "Unauthorized" error in GCP, make sure that the service account
+of the Cloud SQL instance is authorized to write to the selected GCS bucket.
+
+It is not the service account configured in Airflow that communicates with GCS,
+but rather the service account of the particular Cloud SQL instance.
+
+To grant the service account with the appropriate WRITE permissions for the GCS bucket
+you can use the :class:`~airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageBucketCreateAclEntryOperator`,
+as shown in the example:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_export_gcs_permissions]
+    :end-before: [END howto_operator_cloudsql_export_gcs_permissions]
+
+
+.. CloudSqlInstanceImportOperator:
+
+CloudSqlInstanceImportOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+Imports data into a Cloud SQL instance from a SQL dump or CSV file in Cloud Storage.
+
+CSV import:
+"""""""""""
+
+This operator is NOT idempotent for a CSV import. If the same file is imported
+multiple times, the imported data will be duplicated in the database.
+Moreover, if there are any unique constraints the duplicate import may result in an
+error.
+
+SQL import:
+"""""""""""
+
+This operator is idempotent for a SQL import if it was also exported by Cloud SQL.
+The exported SQL contains 'DROP TABLE IF EXISTS' statements for all tables
+to be imported.
+
+If the import file was generated in a different way, idempotence is not guaranteed.
+It has to be ensured on the SQL file level.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceImportOperator`.
+
+Arguments
+"""""""""
+
+Some arguments in the example DAG are taken from Airflow variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_arguments]
+    :end-before: [END howto_operator_cloudsql_arguments]
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_export_import_arguments]
+    :end-before: [END howto_operator_cloudsql_export_import_arguments]
+
+Example body defining the import operation:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :start-after: [START howto_operator_cloudsql_import_body]
+    :end-before: [END howto_operator_cloudsql_import_body]
+
+Using the operator
+""""""""""""""""""
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_import]
+    :end-before: [END howto_operator_cloudsql_import]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gcp_sql_import_template_fields]
+    :end-before: [END gcp_sql_import_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud SQL API documentation for import <https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/import>`_.
+
+Troubleshooting
+"""""""""""""""
+
+If you receive an "Unauthorized" error in GCP, make sure that the service account
+of the Cloud SQL instance is authorized to read from the selected GCS object.
+
+It is not the service account configured in Airflow that communicates with GCS,
+but rather the service account of the particular Cloud SQL instance.
+
+To grant the service account with the appropriate READ permissions for the GCS object
+you can use the :class:`~airflow.contrib.operators.gcs_acl_operator.GoogleCloudStorageObjectCreateAclEntryOperator`,
+as shown in the example:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
+    :language: python
+    :dedent: 4
+    :start-after: [START howto_operator_cloudsql_import_gcs_permissions]
+    :end-before: [END howto_operator_cloudsql_import_gcs_permissions]
+
 .. _CloudSqlInstanceCreateOperator:
 
 CloudSqlInstanceCreateOperator
@@ -773,7 +945,6 @@ More information
 See `Google Cloud SQL API documentation for insert <https://cloud.google
 .com/sql/docs/mysql/admin-api/v1beta4/instances/insert>`_.
 
-
 .. _CloudSqlInstancePatchOperator:
 
 CloudSqlInstancePatchOperator
diff --git a/docs/integration.rst b/docs/integration.rst
index 471ce74ed4..9aaae0f119 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -547,6 +547,8 @@ Cloud SQL Operators
 - :ref:`CloudSqlInstanceDatabasePatchOperator` : updates a database inside a Cloud
   SQL instance.
 - :ref:`CloudSqlInstanceDeleteOperator` : delete a Cloud SQL instance.
+- :ref:`CloudSqlInstanceExportOperator` : exports data from a Cloud SQL instance.
+- :ref:`CloudSqlInstanceImportOperator` : imports data into a Cloud SQL instance.
 - :ref:`CloudSqlInstanceCreateOperator` : create a new Cloud SQL instance.
 - :ref:`CloudSqlInstancePatchOperator` : patch a Cloud SQL instance.
 - :ref:`CloudSqlQueryOperator` : run query in a Cloud SQL instance.
@@ -579,6 +581,20 @@ CloudSqlInstanceDeleteOperator
 
 .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDeleteOperator
 
+.. _CloudSqlInstanceExportOperator:
+
+CloudSqlInstanceExportOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceExportOperator
+
+.. _CloudSqlInstanceImportOperator:
+
+CloudSqlInstanceImportOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceImportOperator
+
 .. _CloudSqlInstanceCreateOperator:
 
 CloudSqlInstanceCreateOperator
diff --git a/tests/contrib/hooks/test_gcp_sql_hook.py b/tests/contrib/hooks/test_gcp_sql_hook.py
new file mode 100644
index 0000000000..cb56237736
--- /dev/null
+++ b/tests/contrib/hooks/test_gcp_sql_hook.py
@@ -0,0 +1,63 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import unittest
+
+from googleapiclient.errors import HttpError
+
+from airflow.contrib.hooks.gcp_sql_hook import CloudSqlHook
+from airflow.exceptions import AirflowException
+
+try:
+    from unittest import mock
+except ImportError:
+    try:
+        import mock
+    except ImportError:
+        mock = None
+
+
+class TestGcpSqlHook(unittest.TestCase):
+    def test_instance_import_ex(self):
+        # Mocking __init__ with an empty anonymous function
+        with mock.patch.object(CloudSqlHook, "__init__", lambda x, y, z: None):
+            hook = CloudSqlHook(None, None)
+            # Simulating HttpError inside import_instance
+            hook.get_conn = mock.Mock(
+                side_effect=HttpError(resp={'status': '400'},
+                                      content='Error content'.encode('utf-8'))
+            )
+            with self.assertRaises(AirflowException) as cm:
+                hook.import_instance(None, None, None)
+            err = cm.exception
+            self.assertIn("Importing instance ", str(err))
+
+    def test_instance_export_ex(self):
+        # Mocking __init__ with an empty anonymous function
+        with mock.patch.object(CloudSqlHook, "__init__", lambda x, y, z: None):
+            hook = CloudSqlHook(None, None)
+            # Simulating HttpError inside export_instance
+            hook.get_conn = mock.Mock(
+                side_effect=HttpError(resp={'status': '400'},
+                                      content='Error content'.encode('utf-8'))
+            )
+            with self.assertRaises(AirflowException) as cm:
+                hook.export_instance(None, None, None)
+            err = cm.exception
+            self.assertIn("Exporting instance ", str(err))
diff --git a/tests/contrib/operators/test_gcp_sql_operator.py b/tests/contrib/operators/test_gcp_sql_operator.py
index 516fcef4aa..6a0a4cde7f 100644
--- a/tests/contrib/operators/test_gcp_sql_operator.py
+++ b/tests/contrib/operators/test_gcp_sql_operator.py
@@ -18,17 +18,18 @@
 # under the License.
 import json
 import os
-import time
 import unittest
-from uuid import uuid1
 
+import time
 from parameterized import parameterized
+from uuid import uuid1
 
 from airflow import AirflowException
 from airflow.contrib.hooks.gcp_sql_hook import CloudSqlProxyRunner
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
     CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
     CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
+    CloudSqlInstanceExportOperator, CloudSqlInstanceImportOperator, \
     CloudSqlInstanceDatabaseDeleteOperator, CloudSqlQueryOperator
 from airflow.models import Connection
 from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
@@ -137,6 +138,36 @@
     "charset": "utf16",
     "collation": "utf16_general_ci"
 }
+EXPORT_BODY = {
+    "exportContext": {
+        "fileType": "CSV",
+        "uri": "gs://bucketName/fileName",
+        "databases": [],
+        "sqlExportOptions": {
+            "tables": [
+                "table1", "table2"
+            ],
+            "schemaOnly": False
+        },
+        "csvExportOptions": {
+            "selectQuery": "SELECT * FROM ..."
+        }
+    }
+}
+IMPORT_BODY = {
+    "importContext": {
+        "fileType": "CSV",
+        "uri": "gs://bucketName/fileName",
+        "database": "db1",
+        "importUser": "",
+        "csvImportOptions": {
+            "table": "my_table",
+            "columns": [
+                "col1", "col2"
+            ]
+        }
+    }
+}
 
 
 class CloudSqlTest(unittest.TestCase):
@@ -152,13 +183,15 @@ def test_instance_create(self, mock_hook, _check_if_instance_exists):
             body=CREATE_BODY,
             task_id="id"
         )
-        result = op.execute(None)
+        result = op.execute(context={
+            'task_instance': mock.Mock()
+        })
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.create_instance.assert_called_once_with(
             PROJECT_ID, CREATE_BODY
         )
-        self.assertTrue(result)
+        self.assertIsNone(result)
 
     @mock.patch("airflow.contrib.operators.gcp_sql_operator"
                 ".CloudSqlInstanceCreateOperator._check_if_instance_exists")
@@ -172,11 +205,13 @@ def test_instance_create_idempotent(self, mock_hook, _check_if_instance_exists):
             body=CREATE_BODY,
             task_id="id"
         )
-        result = op.execute(None)
+        result = op.execute(context={
+            'task_instance': mock.Mock()
+        })
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.create_instance.assert_not_called()
-        self.assertTrue(result)
+        self.assertIsNone(result)
 
     @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
     def test_create_should_throw_ex_when_empty_project_id(self, mock_hook):
@@ -475,6 +510,40 @@ def test_instance_db_delete_should_abort_and_succeed_if_not_exists(
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.delete_database.assert_not_called()
 
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_export(self, mock_hook):
+        mock_hook.return_value.export_instance.return_value = True
+        op = CloudSqlInstanceExportOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            body=EXPORT_BODY,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.export_instance.assert_called_once_with(
+            PROJECT_ID, INSTANCE_NAME, EXPORT_BODY
+        )
+        self.assertTrue(result)
+
+    @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
+    def test_instance_import(self, mock_hook):
+        mock_hook.return_value.export_instance.return_value = True
+        op = CloudSqlInstanceImportOperator(
+            project_id=PROJECT_ID,
+            instance=INSTANCE_NAME,
+            body=IMPORT_BODY,
+            task_id="id"
+        )
+        result = op.execute(None)
+        mock_hook.assert_called_once_with(api_version="v1beta4",
+                                          gcp_conn_id="google_cloud_default")
+        mock_hook.return_value.import_instance.assert_called_once_with(
+            PROJECT_ID, INSTANCE_NAME, IMPORT_BODY
+        )
+        self.assertTrue(result)
+
 
 class CloudSqlQueryValidationTest(unittest.TestCase):
     @parameterized.expand([
@@ -789,6 +858,19 @@ def test_start_proxy_with_all_instances_specific_version(self):
         self.assertEqual(runner.get_proxy_version(), "1.13")
 
 
+@unittest.skipIf(
+    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
+class CloudSqlExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudSqlExampleDagsIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_sql',
+            gcp_key=GCP_CLOUDSQL_KEY)
+
+    def test_run_example_dag_cloudsql_query(self):
+        self._run_dag()
+
+
 @unittest.skipIf(
     BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
 class CloudSqlQueryExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services

Mime
View raw message