airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From "ASF GitHub Bot (JIRA)" <j...@apache.org>
Subject [jira] [Commented] (AIRFLOW-3275) Google Cloud SQL Query operator
Date Tue, 13 Nov 2018 09:44:01 GMT

    [ https://issues.apache.org/jira/browse/AIRFLOW-3275?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16684953#comment-16684953 ] 

ASF GitHub Bot commented on AIRFLOW-3275:
-----------------------------------------

kaxil closed pull request #4170: [AIRFLOW-3275] Implement Google Cloud SQL Query operator
URL: https://github.com/apache/incubator-airflow/pull/4170
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/airflow/contrib/example_dags/example_gcp_compute.py b/airflow/contrib/example_dags/example_gcp_compute.py
index 51a55b6a99..928e9744b6 100644
--- a/airflow/contrib/example_dags/example_gcp_compute.py
+++ b/airflow/contrib/example_dags/example_gcp_compute.py
@@ -21,15 +21,15 @@
 Example Airflow DAG that starts, stops and sets the machine type of a Google Compute
 Engine instance.
 
-This DAG relies on the following Airflow variables
-https://airflow.apache.org/concepts.html#variables
+This DAG relies on the following OS environment variables
+
 * PROJECT_ID - Google Cloud Platform project where the Compute Engine instance exists.
 * ZONE - Google Cloud Platform zone where the instance exists.
 * INSTANCE - Name of the Compute Engine instance.
 * SHORT_MACHINE_TYPE_NAME - Machine type resource name to set, e.g. 'n1-standard-1'.
     See https://cloud.google.com/compute/docs/machine-types
 """
-
+import os
 import datetime
 
 import airflow
@@ -38,17 +38,17 @@
     GceInstanceStopOperator, GceSetMachineTypeOperator
 
 # [START howto_operator_gce_args_common]
-PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow')
-ZONE = models.Variable.get('ZONE', 'europe-west1-b')
-INSTANCE = models.Variable.get('INSTANCE', 'test-instance')
+PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
+ZONE = os.environ.get('ZONE', 'europe-west1-b')
+INSTANCE = os.environ.get('INSTANCE', 'testinstance')
+# [END howto_operator_gce_args_common]
 
 default_args = {
     'start_date': airflow.utils.dates.days_ago(1)
 }
-# [END howto_operator_gce_args_common]
 
 # [START howto_operator_gce_args_set_machine_type]
-SHORT_MACHINE_TYPE_NAME = models.Variable.get('SHORT_MACHINE_TYPE_NAME', 'n1-standard-1')
+SHORT_MACHINE_TYPE_NAME = os.environ.get('SHORT_MACHINE_TYPE_NAME', 'n1-standard-1')
 SET_MACHINE_TYPE_BODY = {
     'machineType': 'zones/{}/machineTypes/{}'.format(ZONE, SHORT_MACHINE_TYPE_NAME)
 }
diff --git a/airflow/contrib/example_dags/example_gcp_compute_igm.py b/airflow/contrib/example_dags/example_gcp_compute_igm.py
index dc24259f9f..3e4543c60d 100644
--- a/airflow/contrib/example_dags/example_gcp_compute_igm.py
+++ b/airflow/contrib/example_dags/example_gcp_compute_igm.py
@@ -50,11 +50,11 @@
 # [START howto_operator_compute_igm_common_args]
 PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
 ZONE = os.environ.get('ZONE', 'europe-west1-b')
+# [END howto_operator_compute_igm_common_args]
 
 default_args = {
     'start_date': airflow.utils.dates.days_ago(1)
 }
-# [END howto_operator_compute_igm_common_args]
 
 # [START howto_operator_compute_template_copy_args]
 TEMPLATE_NAME = os.environ.get('TEMPLATE_NAME', 'instance-template-test')
diff --git a/airflow/contrib/example_dags/example_gcp_function_delete.py b/airflow/contrib/example_dags/example_gcp_function_delete.py
index d87eed39c5..642e3a744c 100644
--- a/airflow/contrib/example_dags/example_gcp_function_delete.py
+++ b/airflow/contrib/example_dags/example_gcp_function_delete.py
@@ -19,13 +19,13 @@
 
 """
 Example Airflow DAG that deletes a Google Cloud Function.
-This DAG relies on the following Airflow variables
-https://airflow.apache.org/concepts.html#variables
+This DAG relies on the following OS environment variables
 * PROJECT_ID - Google Cloud Project where the Cloud Function exists.
 * LOCATION - Google Cloud Functions region where the function exists.
 * ENTRYPOINT - Name of the executable function in the source code.
 """
 
+import os
 import datetime
 
 import airflow
@@ -33,17 +33,18 @@
 from airflow.contrib.operators.gcp_function_operator import GcfFunctionDeleteOperator
 
 # [START howto_operator_gcf_delete_args]
-PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow')
-LOCATION = models.Variable.get('LOCATION', 'europe-west1')
-ENTRYPOINT = models.Variable.get('ENTRYPOINT', 'helloWorld')
+PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
+LOCATION = os.environ.get('LOCATION', 'europe-west1')
+ENTRYPOINT = os.environ.get('ENTRYPOINT', 'helloWorld')
 # A fully-qualified name of the function to delete
 
 FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
                                                                ENTRYPOINT)
+# [END howto_operator_gcf_delete_args]
+
 default_args = {
     'start_date': airflow.utils.dates.days_ago(1)
 }
-# [END howto_operator_gcf_delete_args]
 
 with models.DAG(
     'example_gcp_function_delete',
diff --git a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
index 606cc181b0..76563d7596 100644
--- a/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
+++ b/airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
@@ -20,7 +20,7 @@
 """
 Example Airflow DAG that creates a Google Cloud Function and then deletes it.
 
-This DAG relies on the following Airflow variables
+This DAG relies on the following OS environment variables
 https://airflow.apache.org/concepts.html#variables
 * PROJECT_ID - Google Cloud Project to use for the Cloud Function.
 * LOCATION - Google Cloud Functions region where the function should be
@@ -37,6 +37,7 @@
 * ENTRYPOINT - Name of the executable function in the source code.
 """
 
+import os
 import datetime
 
 from airflow import models
@@ -45,20 +46,20 @@
 from airflow.utils import dates
 
 # [START howto_operator_gcf_deploy_variables]
-PROJECT_ID = models.Variable.get('PROJECT_ID', 'example-airflow')
-LOCATION = models.Variable.get('LOCATION', 'europe-west1')
-SOURCE_ARCHIVE_URL = models.Variable.get('SOURCE_ARCHIVE_URL', '')
-SOURCE_UPLOAD_URL = models.Variable.get('SOURCE_UPLOAD_URL', '')
-SOURCE_REPOSITORY = models.Variable.get('SOURCE_REPOSITORY',
-                                        'https://source.developers.google.com/'
-                                        'projects/example-airflow/'
-                                        'repos/hello-world/moveable-aliases/master')
-ZIP_PATH = models.Variable.get('ZIP_PATH', '')
-ENTRYPOINT = models.Variable.get('ENTRYPOINT', '')
+PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
+LOCATION = os.environ.get('LOCATION', 'europe-west1')
+SOURCE_ARCHIVE_URL = os.environ.get('SOURCE_ARCHIVE_URL', '')
+SOURCE_UPLOAD_URL = os.environ.get('SOURCE_UPLOAD_URL', '')
+SOURCE_REPOSITORY = os.environ.get(
+    'SOURCE_REPOSITORY',
+    'https://source.developers.google.com/'
+    'projects/example-project/repos/hello-world/moveable-aliases/master')
+ZIP_PATH = os.environ.get('ZIP_PATH', '')
+ENTRYPOINT = os.environ.get('ENTRYPOINT', 'helloWorld')
 FUNCTION_NAME = 'projects/{}/locations/{}/functions/{}'.format(PROJECT_ID, LOCATION,
                                                                ENTRYPOINT)
 RUNTIME = 'nodejs6'
-VALIDATE_BODY = models.Variable.get('VALIDATE_BODY', True)
+VALIDATE_BODY = os.environ.get('VALIDATE_BODY', True)
 
 # [END howto_operator_gcf_deploy_variables]
 
@@ -71,11 +72,11 @@
 }
 # [END howto_operator_gcf_deploy_body]
 
-# [START howto_operator_gcf_deploy_args]
+# [START howto_operator_gcf_default_args]
 default_args = {
     'start_date': dates.days_ago(1)
 }
-# [END howto_operator_gcf_deploy_args]
+# [END howto_operator_gcf_default_args]
 
 # [START howto_operator_gcf_deploy_variants]
 if SOURCE_ARCHIVE_URL:
@@ -111,6 +112,6 @@
     # [END howto_operator_gcf_deploy]
     delete_task = GcfFunctionDeleteOperator(
         task_id="gcf_delete_task",
-        name=FUNCTION_NAME,
+        name=FUNCTION_NAME
     )
     deploy_task >> delete_task
diff --git a/airflow/contrib/example_dags/example_gcp_sql.py b/airflow/contrib/example_dags/example_gcp_sql.py
index 136c88c843..45c0895b0f 100644
--- a/airflow/contrib/example_dags/example_gcp_sql.py
+++ b/airflow/contrib/example_dags/example_gcp_sql.py
@@ -21,7 +21,8 @@
 Example Airflow DAG that creates, patches and deletes a Cloud SQL instance, and also
 creates, patches and deletes a database inside the instance, in Google Cloud Platform.
 
-This DAG relies on the following environment variables
+This DAG relies on the following OS environment variables
+https://airflow.apache.org/concepts.html#variables
 * PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance.
 * INSTANCE_NAME - Name of the Cloud SQL instance.
 * DB_NAME - Name of the database inside a Cloud SQL instance.
@@ -40,7 +41,7 @@
 
 # [START howto_operator_cloudsql_arguments]
 PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
-INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testinstance')
+INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'testpostgres')
 DB_NAME = os.environ.get('DB_NAME', 'testdb')
 # [END howto_operator_cloudsql_arguments]
 
diff --git a/airflow/contrib/example_dags/example_gcp_sql_query.py b/airflow/contrib/example_dags/example_gcp_sql_query.py
new file mode 100644
index 0000000000..af7c5c2447
--- /dev/null
+++ b/airflow/contrib/example_dags/example_gcp_sql_query.py
@@ -0,0 +1,259 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+"""
+Example Airflow DAG that performs query in a Cloud SQL instance.
+
+This DAG relies on the following OS environment variables
+
+* PROJECT_ID - Google Cloud Platform project for the Cloud SQL instance
+* LOCATION - Google Cloud location where the database is created
+*
+* POSTGRES_INSTANCE_NAME - Name of the postgres Cloud SQL instance
+* POSTGRES_USER - Name of the postgres database user
+* POSTGRES_PASSWORD - Password of the postgres database user
+* POSTGRES_PROXY_PORT - Local port number for proxy connections for postgres
+* POSTGRES_PUBLIC_IP - Public IP of the Postgres database
+* POSTGRES_PUBLIC_PORT - Port of the postgres database
+*
+* MYSQL_INSTANCE_NAME - Name of the postgres Cloud SQL instance
+* MYSQL_USER - Name of the mysql database user
+* MYSQL_PASSWORD - Password of the mysql database user
+* MYSQL_PROXY_PORT - Local port number for proxy connections for mysql
+* MYSQL_PUBLIC_IP - Public IP of the mysql database
+* MYSQL_PUBLIC_PORT - Port of the mysql database
+"""
+
+import os
+import subprocess
+
+from six.moves.urllib.parse import quote_plus
+
+import airflow
+from airflow import models
+from airflow.contrib.operators.gcp_sql_operator import CloudSqlQueryOperator
+
+# [START howto_operator_cloudsql_query_arguments]
+
+PROJECT_ID = os.environ.get('PROJECT_ID', 'example-project')
+LOCATION = os.environ.get('REGION', 'europe-west-1')
+
+POSTGRES_INSTANCE_NAME = os.environ.get('POSTGRES_INSTANCE_NAME', 'testpostgres')
+POSTGRES_DATABASE_NAME = os.environ.get('POSTGRES_DATABASE_NAME', 'postgresdb')
+POSTGRES_USER = os.environ.get('POSTGRES_USER', 'postgres_user')
+POSTGRES_PASSWORD = os.environ.get('POSTGRES_PASSWORD', 'password')
+POSTGRES_PUBLIC_IP = os.environ.get('POSTGRES_PUBLIC_IP', '0.0.0.0')
+POSTGRES_PUBLIC_PORT = os.environ.get('POSTGRES_PUBLIC_PORT', 5432)
+POSTGRES_CLIENT_CERT_FILE = os.environ.get('POSTGRES_CLIENT_CERT_FILE',
+                                           "/tmp/client-cert.pem")
+POSTGRES_CLIENT_KEY_FILE = os.environ.get('POSTGRES_CLIENT_KEY_FILE',
+                                          "/tmp/client-key.pem")
+POSTGRES_SERVER_CA_FILE = os.environ.get('POSTGRES_SERVER_CA_FILE',
+                                         "/tmp/server-ca.pem")
+
+MYSQL_INSTANCE_NAME = os.environ.get('MYSQL_INSTANCE_NAME', 'testmysql')
+MYSQL_DATABASE_NAME = os.environ.get('MYSQL_DATABASE_NAME', 'mysqldb')
+MYSQL_USER = os.environ.get('MYSQL_USER', 'mysql_user')
+MYSQL_PASSWORD = os.environ.get('MYSQL_PASSWORD', 'password')
+MYSQL_PUBLIC_IP = os.environ.get('MYSQL_PUBLIC_IP', '0.0.0.0')
+MYSQL_PUBLIC_PORT = os.environ.get('MYSQL_PUBLIC_PORT', 3306)
+MYSQL_CLIENT_CERT_FILE = os.environ.get('MYSQL_CLIENT_CERT_FILE',
+                                        "/tmp/client-cert.pem")
+MYSQL_CLIENT_KEY_FILE = os.environ.get('MYSQL_CLIENT_KEY_FILE',
+                                       "/tmp/client-key.pem")
+MYSQL_SERVER_CA_FILE = os.environ.get('MYSQL_SERVER_CA_FILE',
+                                      "/tmp/server-ca.pem")
+
+SQL = [
+    'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',
+    'CREATE TABLE IF NOT EXISTS TABLE_TEST (I INTEGER)',  # shows warnings logged
+    'INSERT INTO TABLE_TEST VALUES (0)',
+    'CREATE TABLE IF NOT EXISTS TABLE_TEST2 (I INTEGER)',
+    'DROP TABLE TABLE_TEST',
+    'DROP TABLE TABLE_TEST2',
+]
+
+# [END howto_operator_cloudsql_query_arguments]
+default_args = {
+    'start_date': airflow.utils.dates.days_ago(1)
+}
+
+
+# [START howto_operator_cloudsql_query_connections]
+
+postgres_kwargs = dict(
+    user=quote_plus(POSTGRES_USER),
+    password=quote_plus(POSTGRES_PASSWORD),
+    public_port=POSTGRES_PUBLIC_PORT,
+    public_ip=quote_plus(POSTGRES_PUBLIC_IP),
+    project_id=quote_plus(PROJECT_ID),
+    location=quote_plus(LOCATION),
+    instance=quote_plus(POSTGRES_INSTANCE_NAME),
+    database=quote_plus(POSTGRES_DATABASE_NAME),
+    client_cert_file=quote_plus(POSTGRES_CLIENT_CERT_FILE),
+    client_key_file=quote_plus(POSTGRES_CLIENT_KEY_FILE),
+    server_ca_file=quote_plus(POSTGRES_SERVER_CA_FILE)
+)
+
+# The connections below are created using one of the standard approaches - via environment
+# variables named AIRFLOW_CONN_* . The connections can also be created in the database
+# of AIRFLOW (using command line or UI).
+
+# Postgres: connect via proxy over TCP
+os.environ['AIRFLOW_CONN_PROXY_POSTGRES_TCP'] = \
+    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
+    "database_type=postgres&" \
+    "project_id={project_id}&" \
+    "location={location}&" \
+    "instance={instance}&" \
+    "use_proxy=True&" \
+    "sql_proxy_use_tcp=True".format(**postgres_kwargs)
+
+# Postgres: connect via proxy over UNIX socket (specific proxy version)
+os.environ['AIRFLOW_CONN_PROXY_POSTGRES_SOCKET'] = \
+    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
+    "database_type=postgres&" \
+    "project_id={project_id}&" \
+    "location={location}&" \
+    "instance={instance}&" \
+    "use_proxy=True&" \
+    "sql_proxy_version=v1.13&" \
+    "sql_proxy_use_tcp=False".format(**postgres_kwargs)
+
+# Postgres: connect directly via TCP (non-SSL)
+os.environ['AIRFLOW_CONN_PUBLIC_POSTGRES_TCP'] = \
+    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
+    "database_type=postgres&" \
+    "project_id={project_id}&" \
+    "location={location}&" \
+    "instance={instance}&" \
+    "use_proxy=False&" \
+    "use_ssl=False".format(**postgres_kwargs)
+
+# Postgres: connect directly via TCP (SSL)
+os.environ['AIRFLOW_CONN_PUBLIC_POSTGRES_TCP_SSL'] = \
+    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
+    "database_type=postgres&" \
+    "project_id={project_id}&" \
+    "location={location}&" \
+    "instance={instance}&" \
+    "use_proxy=False&" \
+    "use_ssl=True&" \
+    "sslcert={client_cert_file}&" \
+    "sslkey={client_key_file}&" \
+    "sslrootcert={server_ca_file}"\
+    .format(**postgres_kwargs)
+
+mysql_kwargs = dict(
+    user=quote_plus(MYSQL_USER),
+    password=quote_plus(MYSQL_PASSWORD),
+    public_port=MYSQL_PUBLIC_PORT,
+    public_ip=quote_plus(MYSQL_PUBLIC_IP),
+    project_id=quote_plus(PROJECT_ID),
+    location=quote_plus(LOCATION),
+    instance=quote_plus(MYSQL_INSTANCE_NAME),
+    database=quote_plus(MYSQL_DATABASE_NAME),
+    client_cert_file=quote_plus(MYSQL_CLIENT_CERT_FILE),
+    client_key_file=quote_plus(MYSQL_CLIENT_KEY_FILE),
+    server_ca_file=quote_plus(MYSQL_SERVER_CA_FILE)
+)
+
+# MySQL: connect via proxy over TCP (specific proxy version)
+os.environ['AIRFLOW_CONN_PROXY_MYSQL_TCP'] = \
+    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
+    "database_type=mysql&" \
+    "project_id={project_id}&" \
+    "location={location}&" \
+    "instance={instance}&" \
+    "use_proxy=True&" \
+    "sql_proxy_version=v1.13&" \
+    "sql_proxy_use_tcp=True".format(**mysql_kwargs)
+
+# MySQL: connect via proxy over UNIX socket using pre-downloaded Cloud Sql Proxy binary
+try:
+    sql_proxy_binary_path = subprocess.check_output(
+        ['which', 'cloud_sql_proxy']).rstrip()
+except subprocess.CalledProcessError:
+    sql_proxy_binary_path = "/tmp/anyhow_download_cloud_sql_proxy"
+
+os.environ['AIRFLOW_CONN_PROXY_MYSQL_SOCKET'] = \
+    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
+    "database_type=mysql&" \
+    "project_id={project_id}&" \
+    "location={location}&" \
+    "instance={instance}&" \
+    "use_proxy=True&" \
+    "sql_proxy_binary_path={sql_proxy_binary_path}&" \
+    "sql_proxy_use_tcp=False".format(
+        sql_proxy_binary_path=quote_plus(sql_proxy_binary_path), **mysql_kwargs)
+
+# MySQL: connect directly via TCP (non-SSL)
+os.environ['AIRFLOW_CONN_PUBLIC_MYSQL_TCP'] = \
+    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
+    "database_type=mysql&" \
+    "project_id={project_id}&" \
+    "location={location}&" \
+    "instance={instance}&" \
+    "use_proxy=False&" \
+    "use_ssl=False".format(**mysql_kwargs)
+
+# MySQL: connect directly via TCP (SSL) and with fixed Cloud Sql Proxy binary path
+os.environ['AIRFLOW_CONN_PUBLIC_MYSQL_TCP_SSL'] = \
+    "gcpcloudsql://{user}:{password}@{public_ip}:{public_port}/{database}?" \
+    "database_type=mysql&" \
+    "project_id={project_id}&" \
+    "location={location}&" \
+    "instance={instance}&" \
+    "use_proxy=False&" \
+    "use_ssl=True&" \
+    "sslcert={client_cert_file}&" \
+    "sslkey={client_key_file}&" \
+    "sslrootcert={server_ca_file}".format(**mysql_kwargs)
+
+# [END howto_operator_cloudsql_query_connections]
+
+# [START howto_operator_cloudsql_query_operators]
+
+connection_names = [
+    "proxy_postgres_tcp",
+    "proxy_postgres_socket",
+    "public_postgres_tcp",
+    "public_postgres_tcp_ssl",
+    "proxy_mysql_tcp",
+    "proxy_mysql_socket",
+    "public_mysql_tcp",
+    "public_mysql_tcp_ssl"
+]
+
+tasks = []
+
+with models.DAG(
+    dag_id='example_gcp_sql_query',
+    default_args=default_args,
+    schedule_interval=None
+) as dag:
+    for connection_name in connection_names:
+        tasks.append(
+            CloudSqlQueryOperator(
+                gcp_cloudsql_conn_id=connection_name,
+                task_id="example_gcp_sql_task_" + connection_name,
+                sql=SQL
+            )
+        )
+# [END howto_operator_cloudsql_query_operators]
diff --git a/airflow/contrib/hooks/gcp_sql_hook.py b/airflow/contrib/hooks/gcp_sql_hook.py
index 549ceaf49c..aee0098b07 100644
--- a/airflow/contrib/hooks/gcp_sql_hook.py
+++ b/airflow/contrib/hooks/gcp_sql_hook.py
@@ -16,15 +16,34 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import errno
+import json
+import os
+import re
+import shutil
+import socket
+import platform
+import subprocess
 import time
+import uuid
+from os.path import isfile
+from subprocess import Popen, PIPE
+from six.moves.urllib.parse import quote_plus
+
+import requests
 from googleapiclient.discovery import build
 
-from airflow import AirflowException
+from airflow import AirflowException, LoggingMixin, models
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 
 # Number of retries - used by googleapiclient method calls to perform retries
 # For requests that are "retriable"
+from airflow.hooks.base_hook import BaseHook
+from airflow.hooks.mysql_hook import MySqlHook
+from airflow.hooks.postgres_hook import PostgresHook
+from airflow.models import Connection
+from airflow.utils.db import provide_session
+
 NUM_RETRIES = 5
 
 # Time to sleep between active checks of the operation results
@@ -89,9 +108,9 @@ def create_instance(self, project_id, body):
             Cloud SQL instances should belong.
         :type project_id: str
         :param body: Body required by the Cloud SQL insert API, as described in
-            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert#request-body
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/insert#request-body.
         :type body: dict
-        :return: True if the operation succeeded, raises an error otherwise
+        :return: True if the operation succeeded, raises an error otherwise.
         :rtype: bool
         """
         response = self.get_conn().instances().insert(
@@ -111,11 +130,11 @@ def patch_instance(self, project_id, body, instance):
         :param project_id: Project ID of the project that contains the instance.
         :type project_id: str
         :param body: Body required by the Cloud SQL patch API, as described in
-            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch#request-body
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/instances/patch#request-body.
         :type body: dict
         :param instance: Cloud SQL instance ID. This does not include the project ID.
         :type instance: str
-        :return: True if the operation succeeded, raises an error otherwise
+        :return: True if the operation succeeded, raises an error otherwise.
         :rtype: bool
         """
         response = self.get_conn().instances().patch(
@@ -155,7 +174,7 @@ def get_database(self, project_id, instance, database):
         :param database: Name of the database in the instance.
         :type database: str
         :return: A Cloud SQL database resource, as described in
-            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases#resource
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases#resource.
         :rtype: dict
         """
         return self.get_conn().databases().get(
@@ -173,9 +192,9 @@ def create_database(self, project, instance, body):
         :param instance: Database instance ID. This does not include the project ID.
         :type instance: str
         :param body: The request body, as described in
-            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body.
         :type body: dict
-        :return: True if the operation succeeded, raises an error otherwise
+        :return: True if the operation succeeded, raises an error otherwise.
         :rtype: bool
         """
         response = self.get_conn().databases().insert(
@@ -189,6 +208,7 @@ def create_database(self, project, instance, body):
     def patch_database(self, project, instance, database, body):
         """
         Updates a database resource inside a Cloud SQL instance.
+
         This method supports patch semantics.
         See: https://cloud.google.com/sql/docs/mysql/admin-api/how-tos/performance#patch
 
@@ -199,9 +219,9 @@ def patch_database(self, project, instance, database, body):
         :param database: Name of the database to be updated in the instance.
         :type database: str
         :param body: The request body, as described in
-            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body
+            https://cloud.google.com/sql/docs/mysql/admin-api/v1beta4/databases/insert#request-body.
         :type body: dict
-        :return: True if the operation succeeded, raises an error otherwise
+        :return: True if the operation succeeded, raises an error otherwise.
         :rtype: bool
         """
         response = self.get_conn().databases().patch(
@@ -223,7 +243,7 @@ def delete_database(self, project, instance, database):
         :type instance: str
         :param database: Name of the database to be deleted in the instance.
         :type database: str
-        :return: True if the operation succeeded, raises an error otherwise
+        :return: True if the operation succeeded, raises an error otherwise.
         :rtype: bool
         """
         response = self.get_conn().databases().delete(
@@ -241,9 +261,9 @@ def _wait_for_operation_to_complete(self, project_id, operation_name):
 
         :param project_id: Project ID of the project that contains the instance.
         :type project_id: str
-        :param operation_name: name of the operation
+        :param operation_name: Name of the operation.
         :type operation_name: str
-        :return: response returned by the operation
+        :return: Response returned by the operation.
         :rtype: dict
         """
         service = self.get_conn()
@@ -261,3 +281,571 @@ def _wait_for_operation_to_complete(self, project_id, operation_name):
                 # No meaningful info to return from the response in case of success
                 return True
             time.sleep(TIME_TO_SLEEP_IN_SECONDS)
+
+
+CLOUD_SQL_PROXY_DOWNLOAD_URL = "https://dl.google.com/cloudsql/cloud_sql_proxy.{}.{}"
+CLOUD_SQL_PROXY_VERSION_DOWNLOAD_URL = \
+    "https://storage.googleapis.com/cloudsql-proxy/{}/cloud_sql_proxy.{}.{}"
+
+GCP_CREDENTIALS_KEY_PATH = "extra__google_cloud_platform__key_path"
+GCP_CREDENTIALS_KEYFILE_DICT = "extra__google_cloud_platform__keyfile_dict"
+
+
+class CloudSqlProxyRunner(LoggingMixin):
+    """
+    Downloads and runs cloud-sql-proxy as subprocess of the python process.
+
+    The cloud-sql-proxy needs to be downloaded and started before we can connect
+    to the Google Cloud SQL instance via database connection. It establishes
+    secure tunnel connection to the database - it authorizes using the
+    GCP credentials that are passed by the configuration.
+
+    More details about the proxy can be found here:
+    https://cloud.google.com/sql/docs/mysql/sql-proxy
+
+    """
+    def __init__(self,
+                 path_prefix,
+                 instance_specification,
+                 gcp_conn_id='google_cloud_default',
+                 project_id=None,
+                 sql_proxy_version=None,
+                 sql_proxy_binary_path=None):
+        """
+        Creates the proxy runner class.
+
+        :param path_prefix: Unique path prefix where proxy will be downloaded and
+            directories created for unix sockets.
+        :type path_prefix: str
+        :param instance_specification: Specification of the instance to connect the
+            proxy to. It should be specified in the form that is described in
+            https://cloud.google.com/sql/docs/mysql/sql-proxy#multiple-instances in
+            -instances parameter (typically in the form of <project>:<region>:<instance>
+            for UNIX socket connections and in the form of
+            <project>:<region>:<instance>=tcp:<port> for TCP connections.
+        :type instance_specification: str
+        :param gcp_conn_id: Id of Google Cloud Platform connection to use for
+            authentication
+        :type: str
+        :param project_id: Optional id of the GCP project to connect to - it overwrites
+            default project id taken from the GCP connection
+        :type project_id: str
+        :param sql_proxy_version: Specific version of SQL proxy to download
+            (for example 'v1.13'). By default latest version is downloaded.
+        :type sql_proxy_version: str
+        :param sql_proxy_binary_path: If specified, then proxy will be
+            used from the path specified rather than dynamically generated. This means
+            that if the binary is not present in that path it will also be downloaded.
+        :type sql_proxy_binary_path: str
+        """
+        super(CloudSqlProxyRunner, self).__init__()
+        self.path_prefix = path_prefix
+        if not self.path_prefix:
+            raise AirflowException("The path_prefix must not be empty!")
+        self.sql_proxy_was_downloaded = False
+        self.sql_proxy_version = sql_proxy_version
+        self.download_sql_proxy_dir = None
+        self.sql_proxy_process = None
+        self.instance_specification = instance_specification
+        self.project_id = project_id
+        self.gcp_conn_id = gcp_conn_id
+        self.command_line_parameters = []
+        self.cloud_sql_proxy_socket_directory = self.path_prefix
+        self.sql_proxy_path = sql_proxy_binary_path if sql_proxy_binary_path \
+            else self.path_prefix + "_cloud_sql_proxy"
+        self.credentials_path = self.path_prefix + "_credentials.json"
+        self._build_command_line_parameters()
+
+    def _build_command_line_parameters(self):
+        self.command_line_parameters.extend(
+            ['-dir', self.cloud_sql_proxy_socket_directory])
+        self.command_line_parameters.extend(
+            ['-instances', self.instance_specification])
+
+    @staticmethod
+    def _is_os_64bit():
+        return platform.machine().endswith('64')
+
+    def _download_sql_proxy_if_needed(self):
+        if os.path.isfile(self.sql_proxy_path):
+            self.log.info("cloud-sql-proxy is already present")
+            return
+        system = platform.system().lower()
+        processor = "amd64" if CloudSqlProxyRunner._is_os_64bit() else "386"
+        if not self.sql_proxy_version:
+            download_url = CLOUD_SQL_PROXY_DOWNLOAD_URL.format(system, processor)
+        else:
+            download_url = CLOUD_SQL_PROXY_VERSION_DOWNLOAD_URL.format(
+                self.sql_proxy_version, system, processor)
+        proxy_path_tmp = self.sql_proxy_path + ".tmp"
+        self.log.info("Downloading cloud_sql_proxy from {} to {}".
+                      format(download_url, proxy_path_tmp))
+        r = requests.get(download_url, allow_redirects=True)
+        # Downloading to .tmp file first to avoid case where partially downloaded
+        # binary is used by parallel operator which uses the same fixed binary path
+        with open(proxy_path_tmp, 'wb') as f:
+            f.write(r.content)
+        if r.status_code != 200:
+            raise AirflowException(
+                "The cloud-sql-proxy could not be downloaded. Status code = {}. "
+                "Reason = {}".format(r.status_code, r.reason))
+        self.log.info("Moving sql_proxy binary from {} to {}".format(
+            proxy_path_tmp, self.sql_proxy_path
+        ))
+        shutil.move(proxy_path_tmp, self.sql_proxy_path)
+        os.chmod(self.sql_proxy_path, 0o744)  # Set executable bit
+        self.sql_proxy_was_downloaded = True
+
+    @provide_session
+    def _get_credential_parameters(self, session):
+        connection = session.query(models.Connection). \
+            filter(models.Connection.conn_id == self.gcp_conn_id).first()
+        session.expunge_all()
+        if GCP_CREDENTIALS_KEY_PATH in connection.extra_dejson:
+            credential_params = [
+                '-credential_file',
+                connection.extra_dejson[GCP_CREDENTIALS_KEY_PATH]
+            ]
+        elif GCP_CREDENTIALS_KEYFILE_DICT in connection.extra_dejson:
+            credential_file_content = json.loads(
+                connection.extra_dejson[GCP_CREDENTIALS_KEYFILE_DICT])
+            self.log.info("Saving credentials to {}".format(self.credentials_path))
+            with open(self.credentials_path, "w") as f:
+                json.dump(credential_file_content, f)
+            credential_params = [
+                '-credential_file',
+                self.credentials_path
+            ]
+        else:
+            self.log.info(
+                "The credentials are not supplied by neither key_path nor "
+                "keyfile_dict of the gcp connection {}. Falling back to "
+                "default activated account".format(self.gcp_conn_id))
+            credential_params = []
+
+        if not self.instance_specification:
+            project_id = connection.extra_dejson.get(
+                'extra__google_cloud_platform__project')
+            if self.project_id:
+                project_id = self.project_id
+            if not project_id:
+                raise AirflowException("For forwarding all instances, the project id "
+                                       "for GCP should be provided either "
+                                       "by project_id extra in the GCP connection or by "
+                                       "project_id provided in the operator.")
+            credential_params.extend(['-projects', project_id])
+        return credential_params
+
+    def start_proxy(self):
+        """
+        Starts Cloud Sql Proxy.
+
+        You have to remember to stop the proxy if you started it!
+        """
+        self._download_sql_proxy_if_needed()
+        if self.sql_proxy_process:
+            raise AirflowException("The sql proxy is already running: {}".format(
+                self.sql_proxy_process))
+        else:
+            command_to_run = [self.sql_proxy_path]
+            command_to_run.extend(self.command_line_parameters)
+            try:
+                self.log.info("Creating directory {}".format(
+                    self.cloud_sql_proxy_socket_directory))
+                os.makedirs(self.cloud_sql_proxy_socket_directory)
+            except OSError:
+                # Needed for python 2 compatibility (exists_ok missing)
+                pass
+            command_to_run.extend(self._get_credential_parameters())
+            self.log.info("Running the command: `{}`".format(" ".join(command_to_run)))
+            self.sql_proxy_process = Popen(command_to_run,
+                                           stdin=PIPE, stdout=PIPE, stderr=PIPE)
+            self.log.info("The pid of cloud_sql_proxy: {}".format(
+                self.sql_proxy_process.pid))
+            while True:
+                line = self.sql_proxy_process.stderr.readline()
+                return_code = self.sql_proxy_process.poll()
+                if line == '' and return_code is not None:
+                    self.sql_proxy_process = None
+                    raise AirflowException(
+                        "The cloud_sql_proxy finished early with return code {}!".format(
+                            return_code))
+                if line != '':
+                    self.log.info(line)
+                if "googleapi: Error" in line or "invalid instance name:" in line:
+                    self.stop_proxy()
+                    raise AirflowException(
+                        "Error when starting the cloud_sql_proxy {}!".format(
+                            line))
+                if "Ready for new connections" in line:
+                    return
+
+    def stop_proxy(self):
+        """
+        Stops running proxy.
+
+        You should stop the proxy after you stop using it.
+        """
+        if not self.sql_proxy_process:
+            raise AirflowException("The sql proxy is not started yet")
+        else:
+            self.log.info("Stopping the cloud_sql_proxy pid: {}".format(
+                self.sql_proxy_process.pid))
+            self.sql_proxy_process.kill()
+            self.sql_proxy_process = None
+        # Cleanup!
+        self.log.info("Removing the socket directory: {}".
+                      format(self.cloud_sql_proxy_socket_directory))
+        shutil.rmtree(self.cloud_sql_proxy_socket_directory, ignore_errors=True)
+        if self.sql_proxy_was_downloaded:
+            self.log.info("Removing downloaded proxy: {}".format(self.sql_proxy_path))
+            # Silently ignore if the file has already been removed (concurrency)
+            try:
+                os.remove(self.sql_proxy_path)
+            except OSError as e:
+                if not e.errno == errno.ENOENT:
+                    raise
+        else:
+            self.log.info("Skipped removing proxy - it was not downloaded: {}".
+                          format(self.sql_proxy_path))
+        if isfile(self.credentials_path):
+            self.log.info("Removing generated credentials file {}".
+                          format(self.credentials_path))
+            # Here file cannot be delete by concurrent task (each task has its own copy)
+            os.remove(self.credentials_path)
+
+    def get_proxy_version(self):
+        """
+        Returns version of the Cloud Sql Proxy.
+        """
+        self._download_sql_proxy_if_needed()
+        command_to_run = [self.sql_proxy_path]
+        command_to_run.extend(['--version'])
+        command_to_run.extend(self._get_credential_parameters())
+        result = subprocess.check_output(command_to_run)
+        pattern = re.compile("^.*[V|v]ersion ([^;]*);.*$")
+        m = pattern.match(result)
+        if m:
+            return m.group(1)
+        else:
+            return None
+
+    def get_socket_path(self):
+        """
+        Retrieves UNIX socket path used by Cloud Sql Proxy.
+
+        :return: The dynamically generated path for the socket created by the proxy.
+        :rtype: str
+        """
+        return self.cloud_sql_proxy_socket_directory + "/" + self.instance_specification
+
+
+CONNECTION_URIS = {
+    "postgres": {
+        "proxy": {
+            "tcp":
+                "postgresql://{user}:{password}@127.0.0.1:{proxy_port}/{database}",
+            "socket":
+                "postgresql://{user}:{password}@{socket_path}/{database}"
+        },
+        "public": {
+            "ssl":
+                "postgresql://{user}:{password}@{public_ip}:{public_port}/{database}?"
+                "sslmode=verify-ca&"
+                "sslcert={client_cert_file}&"
+                "sslkey={client_key_file}&"
+                "sslrootcert={server_ca_file}",
+            "non-ssl":
+                "postgresql://{user}:{password}@{public_ip}:{public_port}/{database}"
+        }
+    },
+    "mysql": {
+        "proxy": {
+            "tcp":
+                "mysql://{user}:{password}@127.0.0.1:{proxy_port}/{database}",
+            "socket":
+                "mysql://{user}:{password}@localhost/{database}?"
+                "unix_socket={socket_path}"
+        },
+        "public": {
+            "ssl":
+                "mysql://{user}:{password}@{public_ip}:{public_port}/{database}?"
+                "ssl={ssl_spec}",
+            "non-ssl":
+                "mysql://{user}:{password}@{public_ip}:{public_port}/{database}"
+        }
+    }
+}
+
+CLOUD_SQL_VALID_DATABASE_TYPES = ['postgres', 'mysql']
+
+
+# noinspection PyAbstractClass
+class CloudSqlDatabaseHook(BaseHook):
+    """
+    Serves DB connection configuration for CloudSQL (Connections
+    of *gcpcloudsql://* type).
+
+    The hook is a "meta" one - it does not perform an actual connection,
+    it is there to retrieve all the parameters configured in gcpcloudsql:// connection,
+    start/stop Cloud Sql Proxy if needed, dynamically generate Postgres or MySQL
+    connection in the database and return an actual Postgres or MySQL hook.
+    The returned Postgres/MySQL hooks are using direct connection or Cloud Sql
+    Proxy socket/tcp as configured.
+
+    Main parameters of the hook are retrieved from the standard URI components:
+
+    * **user** - User name to authenticate to the database (from login of the URI).
+    * **password** - Password to authenticate to the database (from password of the URI)
+    * **public_ip** - IP to connect to for public connection (from host of the URI)
+    * **public_port** - Port to connect to for public connection (from port of the URI)
+    * **database** - Database to connect to (from schema of the URI)
+
+    Remaining parameters are retrieved from the extras (URI query parameters):
+
+    * **project_id** - Google Cloud Platform project where the Cloud SQL instance exists.
+    * **instance** -  Name of the instance of the Cloud SQL database instance.
+    * **location** - The location of the cloud sql instance (for example europe-west1).
+    * **database_type** - The type of the database instance (mysql or postgres).
+    * **use_proxy** - (default False) Whether SQL proxy should be used to connect to Cloud
+      SQL DB.
+    * **use_ssl** - (default False) Whether SSL should be used to connect to Cloud SQL DB.
+      You cannot use proxy and ssl together.
+    * **sql_proxy_use_tcp** - (default False) If set to true, TCP is used to connect via
+      proxy, otherwise UNIX sockets are used.
+    * **sql_proxy_binary_path** - Optional path to sql proxy binary. If the binary is not
+      specified or the binary is not present, it is automatically downloaded.
+    * **sql_proxy_version** -  Specific version of the proxy to download (for example
+      v1.13). If not specified, the latest version is downloaded.
+    * **sslcert** - Path to client certificate to authenticate when SSL is used.
+    * **sslkey** - Path to client private key to authenticate when SSL is used.
+    * **sslrootcert** - Path to server's certificate to authenticate when SSL is used.
+    """
+    _conn = None
+
+    def __init__(self, gcp_cloudsql_conn_id='google_cloud_sql_default'):
+        super(CloudSqlDatabaseHook, self).__init__(source=None)
+        self.gcp_cloudsql_conn_id = gcp_cloudsql_conn_id
+        self.cloudsql_connection = self.get_connection(self.gcp_cloudsql_conn_id)
+        self.extras = self.cloudsql_connection.extra_dejson
+        self.project_id = self.extras.get('project_id')
+        self.instance = self.extras.get('instance')
+        self.database = self.cloudsql_connection.schema
+        self.location = self.extras.get('location')
+        self.database_type = self.extras.get('database_type')
+        self.use_proxy = self._get_bool(self.extras.get('use_proxy', 'False'))
+        self.use_ssl = self._get_bool(self.extras.get('use_ssl', 'False'))
+        self.sql_proxy_use_tcp = self._get_bool(
+            self.extras.get('sql_proxy_use_tcp', 'False'))
+        self.sql_proxy_version = self.extras.get('sql_proxy_version')
+        self.sql_proxy_binary_path = self.extras.get('sql_proxy_binary_path')
+        self.user = self.cloudsql_connection.login
+        self.password = self.cloudsql_connection.password
+        self.public_ip = self.cloudsql_connection.host
+        self.public_port = self.cloudsql_connection.port
+        self.sslcert = self.extras.get('sslcert')
+        self.sslkey = self.extras.get('sslkey')
+        self.sslrootcert = self.extras.get('sslrootcert')
+        # Port and socket path and db_hook are automatically generated
+        self.sql_proxy_tcp_port = None
+        self.sql_proxy_unique_path = None
+        self.db_hook = None
+        self.reserved_tcp_socket = None
+        # Generated based on clock + clock sequence. Unique per host (!).
+        # This is important as different hosts share the database
+        self.db_conn_id = str(uuid.uuid1())
+        self._validate_inputs()
+
+    @staticmethod
+    def _get_bool(val):
+        if val == 'False':
+            return False
+        return val
+
+    @staticmethod
+    def _check_ssl_file(file_to_check, name):
+        if not file_to_check:
+            raise AirflowException("SSL connections requires {name} to be set".
+                                   format(name=name))
+        if not isfile(file_to_check):
+            raise AirflowException("The {file_to_check} must be a readable file".
+                                   format(file_to_check=file_to_check))
+
+    def _validate_inputs(self):
+        if not self.project_id:
+            raise AirflowException("The required extra 'project_id' is empty")
+        if not self.location:
+            raise AirflowException("The required extra 'location' is empty")
+        if not self.instance:
+            raise AirflowException("The required extra 'instance' is empty")
+        if self.database_type not in CLOUD_SQL_VALID_DATABASE_TYPES:
+            raise AirflowException("Invalid database type '{}'. Must be one of {}".format(
+                self.database_type, CLOUD_SQL_VALID_DATABASE_TYPES
+            ))
+        if self.use_proxy and self.use_ssl:
+            raise AirflowException("Cloud Sql Proxy does not support SSL connections."
+                                   " SSL is not needed as Cloud Sql Proxy "
+                                   "provides encryption on its own")
+        if self.use_ssl:
+            self._check_ssl_file(self.sslcert, "sslcert")
+            self._check_ssl_file(self.sslkey, "sslkey")
+            self._check_ssl_file(self.sslrootcert, "sslrootcert")
+
+    def _generate_unique_path(self):
+        # We are not using mkdtemp here as the path generated with mkdtemp
+        # can be close to 60 characters and there is a limitation in
+        # length of socket path to around 100 characters in total.
+        # We append project/location/instance to it later and postgres
+        # appends its own prefix, so we chose a shorter "/tmp/{uuid1}" - based
+        # on host name and clock + clock sequence. This should be fairly
+        # sufficient for our needs and should even work if the time is set back.
+        # We are using db_conn_id generated with uuid1 so that connection
+        # id matches the folder - for easier debugging.
+        return "/tmp/" + self.db_conn_id
+
+    @staticmethod
+    def _quote(value):
+        return quote_plus(value) if value else None
+
+    def _generate_connection_uri(self):
+        if self.use_proxy:
+            if self.sql_proxy_use_tcp:
+                if not self.sql_proxy_tcp_port:
+                    self.reserve_free_tcp_port()
+            if not self.sql_proxy_unique_path:
+                self.sql_proxy_unique_path = self._generate_unique_path()
+
+        database_uris = CONNECTION_URIS[self.database_type]
+        ssl_spec = None
+        socket_path = None
+        if self.use_proxy:
+            proxy_uris = database_uris['proxy']
+            if self.sql_proxy_use_tcp:
+                format_string = proxy_uris['tcp']
+            else:
+                format_string = proxy_uris['socket']
+                socket_path = \
+                    "{sql_proxy_socket_path}/{instance_socket_name}".format(
+                        sql_proxy_socket_path=self.sql_proxy_unique_path,
+                        instance_socket_name=self._get_instance_socket_name()
+                    )
+        else:
+            public_uris = database_uris['public']
+            if self.use_ssl:
+                format_string = public_uris['ssl']
+                ssl_spec = {
+                    'cert': self.sslcert,
+                    'key': self.sslkey,
+                    'ca': self.sslrootcert
+                }
+            else:
+                format_string = public_uris['non-ssl']
+
+        connection_uri = format_string.format(
+            user=quote_plus(self.user),
+            password=quote_plus(self.password),
+            database=quote_plus(self.database),
+            public_ip=self.public_ip,
+            public_port=self.public_port,
+            proxy_port=self.sql_proxy_tcp_port,
+            socket_path=self._quote(socket_path),
+            ssl_spec=self._quote(json.dumps(ssl_spec)) if ssl_spec else None,
+            client_cert_file=self._quote(self.sslcert),
+            client_key_file=self._quote(self.sslkey),
+            server_ca_file=self._quote(self.sslrootcert)
+        )
+        self.log.info("DB connection URI {}".format(connection_uri.replace(
+            quote_plus(self.password), 'XXXXXXXXXXXX')))
+        return connection_uri
+
+    def _get_instance_socket_name(self):
+        return self.project_id + ":" + self.location + ":" + self.instance
+
+    def _get_sqlproxy_instance_specification(self):
+        instance_specification = self._get_instance_socket_name()
+        if self.sql_proxy_use_tcp:
+            instance_specification += "=tcp:" + str(self.sql_proxy_tcp_port)
+        return instance_specification
+
+    @provide_session
+    def create_connection(self, session=None):
+        """
+        Create connection in the Connection table - according to whether it uses
+        proxy, TCP, UNIX sockets, SSL. Connection ID will be randomly generated.
+
+        :param session: Session of the SQL Alchemy ORM (automatically generated with
+                        decorator).
+        """
+        connection = Connection(conn_id=self.db_conn_id)
+        uri = self._generate_connection_uri()
+        self.log.info("Creating connection {}".format(self.db_conn_id))
+        connection.parse_from_uri(uri)
+        session.add(connection)
+        session.commit()
+
+    @provide_session
+    def delete_connection(self, session=None):
+        """
+        Delete the dynamically created connection from the Connection table.
+
+        :param session: Session of the SQL Alchemy ORM (automatically generated with
+                        decorator).
+        """
+        self.log.info("Deleting connection {}".format(self.db_conn_id))
+        connection = session.query(models.Connection).filter(
+            models.Connection.conn_id == self.db_conn_id)[0]
+        session.delete(connection)
+        session.commit()
+
+    def get_sqlproxy_runner(self):
+        """
+        Retrieve Cloud Sql Proxy runner. It is used to manage the proxy
+        lifecycle per task.
+
+        :return: The Cloud Sql Proxy runner.
+        :rtype: CloudSqlProxyRunner
+        """
+        return CloudSqlProxyRunner(
+            path_prefix=self.sql_proxy_unique_path,
+            instance_specification=self._get_sqlproxy_instance_specification(),
+            project_id=self.project_id,
+            sql_proxy_version=self.sql_proxy_version,
+            sql_proxy_binary_path=self.sql_proxy_binary_path
+        )
+
+    def get_database_hook(self):
+        """
+        Retrieve database hook - this is the actual Postgres or MySQL database hook
+        that uses proxy or connects directly to the Google Cloud Sql database.
+        """
+        if self.database_type == 'postgres':
+            self.db_hook = PostgresHook(postgres_conn_id=self.db_conn_id,
+                                        schema=self.database)
+        else:
+            self.db_hook = MySqlHook(mysql_conn_id=self.db_conn_id,
+                                     schema=self.database)
+        return self.db_hook
+
+    def cleanup_database_hook(self):
+        """
+        Clean up database hook after it was used.
+        """
+        if self.database_type == 'postgres':
+            for output in self.db_hook.conn.notices:
+                self.log.info(output)
+
+    def reserve_free_tcp_port(self):
+        """
+        Reserve free TCP port to be used by Cloud Sql Proxy
+        """
+        self.reserved_tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
+        self.reserved_tcp_socket.bind(('127.0.0.1', 0))
+        self.sql_proxy_tcp_port = self.reserved_tcp_socket.getsockname()[1]
+
+    def free_reserved_port(self):
+        """
+        Free TCP port - makes it immediately ready to be used by Cloud Sql Proxy.
+        """
+        if self.reserved_tcp_socket:
+            self.reserved_tcp_socket.close()
+            self.reserved_tcp_socket = None
diff --git a/airflow/contrib/operators/gcp_sql_operator.py b/airflow/contrib/operators/gcp_sql_operator.py
index aca2e8ad40..711f2c8d15 100644
--- a/airflow/contrib/operators/gcp_sql_operator.py
+++ b/airflow/contrib/operators/gcp_sql_operator.py
@@ -19,7 +19,7 @@
 from googleapiclient.errors import HttpError
 
 from airflow import AirflowException
-from airflow.contrib.hooks.gcp_sql_hook import CloudSqlHook
+from airflow.contrib.hooks.gcp_sql_hook import CloudSqlHook, CloudSqlDatabaseHook
 from airflow.contrib.utils.gcp_field_validator import GcpBodyFieldValidator
 from airflow.models import BaseOperator
 from airflow.utils.decorators import apply_defaults
@@ -524,3 +524,81 @@ def execute(self, context):
         else:
             return self._hook.delete_database(self.project_id, self.instance,
                                               self.database)
+
+
+class CloudSqlQueryOperator(BaseOperator):
+    """
+    Performs DML or DDL query on an existing Cloud Sql instance. It optionally uses
+    cloud-sql-proxy to establish secure connection with the database.
+
+    :param sql: SQL query or list of queries to run (should be DML or DDL query -
+        this operator does not return any data from the database,
+        so it is useless to pass it DQL queries. Note that it is responsibility of the
+        author of the queries to make sure that the queries are idempotent. For example
+        you can use CREATE TABLE IF NOT EXISTS to create a table.
+    :type sql: str or [str]
+    :param parameters: (optional) the parameters to render the SQL query with.
+    :type parameters: mapping or iterable
+    :param autocommit: if True, each command is automatically committed.
+        (default value: False)
+    :type autocommit: bool
+    :param gcp_conn_id: The connection ID used to connect to Google Cloud Platform for
+        cloud-sql-proxy authentication.
+    :type gcp_conn_id: str
+    :param gcp_cloudsql_conn_id: The connection ID used to connect to Google Cloud SQL
+       its schema should be gcpcloudsql://.
+       See :class:`~airflow.contrib.hooks.gcp_sql_hooks.CloudSqlDatabaseHook` for
+       details on how to define gcpcloudsql:// connection.
+    :type gcp_cloudsql_conn_id: str
+    """
+    # [START gcp_sql_query_template_fields]
+    template_fields = ('sql', 'gcp_cloudsql_conn_id', 'gcp_conn_id')
+    template_ext = ('.sql',)
+    # [END gcp_sql_query_template_fields]
+
+    @apply_defaults
+    def __init__(self,
+                 sql,
+                 autocommit=False,
+                 parameters=None,
+                 gcp_conn_id='google_cloud_default',
+                 gcp_cloudsql_conn_id='google_cloud_sql_default',
+                 *args, **kwargs):
+        super(CloudSqlQueryOperator, self).__init__(*args, **kwargs)
+        self.sql = sql
+        self.gcp_conn_id = gcp_conn_id
+        self.gcp_cloudsql_conn_id = gcp_cloudsql_conn_id
+        self.autocommit = autocommit
+        self.parameters = parameters
+        self.cloudsql_db_hook = CloudSqlDatabaseHook(
+            gcp_cloudsql_conn_id=gcp_cloudsql_conn_id)
+        self.cloud_sql_proxy_runner = None
+        self.database_hook = None
+
+    def pre_execute(self, context):
+        self.cloudsql_db_hook.create_connection()
+        self.database_hook = self.cloudsql_db_hook.get_database_hook()
+        if self.cloudsql_db_hook.use_proxy:
+            self.cloud_sql_proxy_runner = self.cloudsql_db_hook.get_sqlproxy_runner()
+            self.cloudsql_db_hook.free_reserved_port()
+            # There is very, very slim chance that the socket will be taken over
+            # here by another bind(0). It's quite unlikely to happen though!
+            self.cloud_sql_proxy_runner.start_proxy()
+
+    def execute(self, context):
+        self.log.info('Executing: "%s"', self.sql)
+        self.database_hook.run(self.sql, self.autocommit, parameters=self.parameters)
+
+    def post_execute(self, context, result=None):
+        # Make sure that all the cleanups happen, no matter if there are some
+        # exceptions thrown
+        try:
+            self.cloudsql_db_hook.cleanup_database_hook()
+        finally:
+            try:
+                if self.cloud_sql_proxy_runner:
+                    self.cloud_sql_proxy_runner.stop_proxy()
+                    self.cloud_sql_proxy_runner = None
+            finally:
+                self.cloudsql_db_hook.delete_connection()
+                self.cloudsql_db_hook = None
diff --git a/airflow/models.py b/airflow/models.py
index fa33609852..1cd5ae48be 100755
--- a/airflow/models.py
+++ b/airflow/models.py
@@ -667,6 +667,7 @@ class Connection(Base, LoggingMixin):
         ('azure_data_lake', 'Azure Data Lake'),
         ('cassandra', 'Cassandra',),
         ('qubole', 'Qubole'),
+        ('gcpcloudsql', 'Google Cloud SQL'),
     ]
 
     def __init__(
@@ -807,6 +808,9 @@ def get_hook(self):
             elif self.conn_type == 'cassandra':
                 from airflow.contrib.hooks.cassandra_hook import CassandraHook
                 return CassandraHook(cassandra_conn_id=self.conn_id)
+            elif self.conn_type == 'gcpcloudsql':
+                from airflow.contrib.hooks.gcp_sql_hook import CloudSqlDatabaseHook
+                return CloudSqlDatabaseHook(gcp_cloudsql_conn_id=self.conn_id)
         except Exception:
             pass
 
@@ -2504,7 +2508,6 @@ def __init__(
                     c=self.__class__.__name__, a=args, k=kwargs),
                 category=PendingDeprecationWarning
             )
-
         validate_key(task_id)
         self.task_id = task_id
         self.owner = owner
diff --git a/docs/howto/manage-connections.rst b/docs/howto/manage-connections.rst
index 4d0adfb100..b5a97eabc7 100644
--- a/docs/howto/manage-connections.rst
+++ b/docs/howto/manage-connections.rst
@@ -136,7 +136,7 @@ Scopes (comma separated)
 
 MySQL
 ~~~~~
-The MySQL connect type allows to connect with MySQL database.
+The MySQL connection type provides connection to a MySQL database.
 
 Configuring the Connection
 ''''''''''''''''''''''''''
@@ -211,3 +211,125 @@ Extra (optional)
     .. note::
         If encounter UnicodeDecodeError while working with MySQL connection check
         the charset defined is matched to the database charset.
+
+Postgres
+~~~~~~~~
+The Postgres connection type provides connection to a Postgres database.
+
+Configuring the Connection
+''''''''''''''''''''''''''
+Host (required)
+    The host to connect to.
+
+Schema (optional)
+    Specify the schema name to be used in the database.
+
+Login (required)
+    Specify the user name to connect.
+
+Password (required)
+    Specify the password to connect.
+
+Extra (optional)
+    Specify the extra parameters (as json dictionary) that can be used in mysql
+    connection. The following parameters out of the standard python parameters
+    are supported:
+
+    * **sslmode** - This option determines whether or with what priority a secure SSL
+      TCP/IP connection will be negotiated with the server. There are six modes:
+      'disable', 'allow', 'prefer', 'require', 'verify-ca', 'verify-full'.
+    * **sslcert** - This parameter specifies the file name of the client SSL certificate,
+      replacing the default.
+    * **sslkey** - This parameter specifies the file name of the client SSL key,
+      replacing the default.
+    * **sslrootcert** - This parameter specifies the name of a file containing SSL
+      certificate authority (CA) certificate(s).
+    * **sslcrl** - This parameter specifies the file name of the SSL certificate
+      revocation list (CRL).
+    * **application_name** - Specifies a value for the application_name
+      configuration parameter.
+    * **keepalives_idle** - Controls the number of seconds of inactivity after which TCP
+      should send a keepalive message to the server.
+
+    More details on all Postgres parameters supported can be found in
+    `Postgres documentation <https://www.postgresql.org/docs/current/static/libpq-connect.html#LIBPQ-CONNSTRING>`_
+
+    Example "extras" field:
+
+    .. code-block:: json
+
+       {
+          "sslmode": "verify-ca",
+          "sslcert": "/tmp/client-cert.pem",
+          "sslca": "/tmp/server-ca.pem'",
+          "sslkey": "/tmp/client-key.pem"
+       }
+
+    When specifying the connection as URI (in AIRFLOW_CONN_* variable) you should specify it
+    following the standard syntax of DB connections, where extras as passed as parameters
+    of the URI (note that all components of the URI should be URL-encoded).
+
+    For example:
+
+    .. code-block:: bash
+
+        postgresql://postgres_user:XXXXXXXXXXXX@1.1.1.1:5432/postgresdb?sslmode=verify-ca&sslcert=%2Ftmp%2Fclient-cert.pem&sslkey=%2Ftmp%2Fclient-key.pem&sslrootcert=%2Ftmp%2Fserver-ca.pem
+
+Cloudsql
+~~~~~~~~
+The gcpcloudsql:// connection is used by
+:class:`airflow.contrib.operators.gcp_sql_operator.CloudSqlQueryOperator` to perform query
+on a Google Cloud SQL database. Google Cloud SQL database can be either
+Postgres or MySQL, so this is a "meta" connection type - it introduces common schema
+for both MySQL and Postgres, including what kind of connectivity should be used.
+Google Cloud SQL supports connecting via public IP or via Cloud Sql Proxy
+and in the latter case the
+:class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlDatabaseHook` uses
+:class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlProxyRunner` to automatically prepare
+and use temporary Postgres or MySQL connection that will use the proxy to connect
+(either via TCP or UNIX socket)
+
+Configuring the Connection
+''''''''''''''''''''''''''
+Host (required)
+    The host to connect to.
+
+Schema (optional)
+    Specify the schema name to be used in the database.
+
+Login (required)
+    Specify the user name to connect.
+
+Password (required)
+    Specify the password to connect.
+
+Extra (optional)
+    Specify the extra parameters (as json dictionary) that can be used in mysql
+    connection.
+
+    Details of all the parameters supported in extra field can be found in
+    :class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlDatabaseHook`
+
+    Example "extras" field:
+
+    .. code-block:: json
+
+       {
+          "database_type": "mysql",
+          "project_id": "example-project",
+          "location": "europe-west1",
+          "instance": "testinstance",
+          "use_proxy": true,
+          "sql_proxy_use_tcp": false
+       }
+
+    When specifying the connection as URI (in AIRFLOW_CONN_* variable) you should specify it
+    following the standard syntax of DB connections, where extras as passed as parameters
+    of the URI (note that all components of the URI should be URL-encoded).
+
+    For example:
+
+    .. code-block:: bash
+
+        gcpcloudsql://user:XXXXXXXXX@1.1.1.1:3306/mydb?database_type=mysql&project_id=example-project&location=europe-west1&instance=testinstance&use_proxy=True&sql_proxy_use_tcp=False
+
diff --git a/docs/howto/operator.rst b/docs/howto/operator.rst
index 806eaa99d9..63e593a2b6 100644
--- a/docs/howto/operator.rst
+++ b/docs/howto/operator.rst
@@ -117,8 +117,7 @@ to start an existing Google Compute Engine instance.
 Arguments
 """""""""
 
-The following examples of OS environment variables show how you can build function name
-to use in the operator and build default args to pass them to multiple tasks:
+The following examples of OS environment variables used to pass arguments to the operator:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
     :language: python
@@ -160,8 +159,7 @@ For parameter definition take a look at
 Arguments
 """""""""
 
-The following examples of OS environment variables show how you can build function name
-to use in the operator and build default args to pass them to multiple tasks:
+The following examples of OS environment variables used to pass arguments to the operator:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
    :language: python
@@ -203,8 +201,7 @@ For parameter definition take a look at
 Arguments
 """""""""
 
-The following examples of OS environment variables show how you can build function name
-to use in the operator and build default args to pass them to multiple tasks:
+The following examples of OS environment variables used to pass arguments to the operator:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute.py
     :language: python
@@ -253,8 +250,7 @@ For parameter definition take a look at
 Arguments
 """""""""
 
-The following examples of OS environment variables show how you can build parameters
-passed to the operator and build default args to pass them to multiple tasks:
+The following examples of OS environment variables used to pass arguments to the operator:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py
     :language: python
@@ -300,8 +296,7 @@ For parameter definition take a look at
 Arguments
 """""""""
 
-The following examples of OS environment variables show how you can build parameters
-passed to the operator and build default args to pass them to multiple tasks:
+The following examples of OS environment variables used to pass arguments to the operator:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_compute_igm.py
     :language: python
@@ -358,7 +353,7 @@ Arguments
 """""""""
 
 The following examples of OS environment variables show how you can build function name
-to use in the operator and build default args to pass them to multiple tasks:
+to use in the operator:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_delete.py
     :language: python
@@ -424,8 +419,8 @@ For parameter definition take a look at
 Arguments
 """""""""
 
-The following examples of OS environment variables show various variants and combinations
-of default_args that you can use. The variables are defined as follows:
+The following examples of OS environment variables show several variants of args you can
+use with the operator:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
     :language: python
@@ -444,18 +439,19 @@ arguments common with other tasks:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_function_deploy_delete.py
     :language: python
-    :dedent: 4
-    :start-after: [START howto_operator_gcf_deploy_args]
-    :end-before: [END howto_operator_gcf_deploy_args]
+    :start-after: [START howto_operator_gcf_default_args]
+    :end-before: [END howto_operator_gcf_default_args]
 
 Note that the neither the body nor the default args are complete in the above examples.
-Depending on the set variables, there might be different variants on how to pass source
+Depending on the variables set, there might be different variants on how to pass source
 code related fields. Currently, you can pass either sourceArchiveUrl, sourceRepository
 or sourceUploadUrl as described in the
 `Cloud Functions API specification <https://cloud.google.com/functions/docs/reference/rest/v1/projects.locations.functions#CloudFunction>`_.
-Additionally, default_args might contain zip_path parameter to run the extra step of
-uploading the source code before deploying it. In the last case, you also need to
-provide an empty `sourceUploadUrl` parameter in the body.
+
+Additionally, default_args or direct operator args might contain zip_path parameter
+to run the extra step of uploading the source code before deploying it.
+In this case, you also need to provide an empty `sourceUploadUrl`
+parameter in the body.
 
 Using the operator
 """"""""""""""""""
@@ -675,7 +671,7 @@ For parameter definition take a look at
 Arguments
 """""""""
 
-Some arguments in the example DAG are taken from environment variables:
+Some arguments in the example DAG are taken from OS environment variables:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
     :language: python
@@ -722,7 +718,7 @@ will succeed.
 Arguments
 """""""""
 
-Some arguments in the example DAG are taken from environment variables:
+Some arguments in the example DAG are taken from OS environment variables:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
     :language: python
@@ -778,7 +774,7 @@ unchanged.
 Arguments
 """""""""
 
-Some arguments in the example DAG are taken from environment variables:
+Some arguments in the example DAG are taken from OS environment variables:
 
 .. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql.py
     :language: python
@@ -815,3 +811,96 @@ More information
 
 See `Google Cloud SQL API documentation for patch <https://cloud.google
 .com/sql/docs/mysql/admin-api/v1beta4/instances/patch>`_.
+
+
+CloudSqlQueryOperator
+^^^^^^^^^^^^^^^^^^^^^
+
+Performs DDL or DML SQL queries in Google Cloud SQL instance. The DQL
+(retrieving data from Google Cloud SQL) is not supported - you might run the SELECT
+queries but results of those queries are discarded.
+
+You can specify various connectivity methods to connect to running instance -
+starting from public IP plain connection through public IP with SSL or both TCP and
+socket connection via Cloud Sql Proxy. The proxy is downloaded and started/stopped
+dynamically as needed by the operator.
+
+There is a *gcpcloudsql://* connection type that you should use to define what
+kind of connectivity you want the operator to use. The connection is a "meta"
+type of connection. It is not used to make an actual connectivity on its own, but it
+determines whether Cloud Sql Proxy should be started by `CloudSqlDatabaseHook`
+and what kind of the database connection (Postgres or MySQL) should be created
+dynamically - to either connect to Cloud SQL via public IP address or via the proxy.
+The 'CloudSqlDatabaseHook` uses
+:class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlProxyRunner` to manage Cloud Sql
+Proxy lifecycle (each task has its own Cloud Sql Proxy)
+
+When you build connection, you should use connection parameters as described in
+:class:`~airflow.contrib.hooks.gcp_sql_hook.CloudSqlDatabaseHook`. You can see
+examples of connections below for all the possible types of connectivity. Such connection
+can be reused between different tasks (instances of `CloudSqlQueryOperator`) - each
+task will get their own proxy started if needed with their own TCP or UNIX socket.
+
+For parameter definition take a look at
+:class:`~airflow.contrib.operators.gcp_sql_operator.CloudSqlQueryOperator`.
+
+Since query operator can run arbitrary query - it cannot be guaranteed to be
+idempotent. SQL query designer should design the queries to be idempotent. For example
+both Postgres and MySql support CREATE TABLE IF NOT EXISTS statements that can be
+used to create tables in an idempotent way.
+
+Arguments
+"""""""""
+
+If you define connection via `AIRFLOW_CONN_*` URL defined in an environment
+variable, make sure the URL components in the URL are URL-encoded.
+See examples below for details.
+
+Note that in case of SSL connections you need to have a mechanism to make the
+certificate/key files available in predefined locations for all the workers on
+which the operator can run. This can be provided for example by mounting
+NFS-like volumes in the same path for all the workers.
+
+Some arguments in the example DAG are taken from the OS environment variables:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql_query.py
+      :language: python
+      :start-after: [START howto_operator_cloudsql_query_arguments]
+      :end-before: [END howto_operator_cloudsql_query_arguments]
+
+Example connection definitions for all connectivity cases. Note that all the components
+of the connection URI should be URL-encoded:
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql_query.py
+      :language: python
+      :start-after: [START howto_operator_cloudsql_query_connections]
+      :end-before: [END howto_operator_cloudsql_query_connections]
+
+Using the operator
+""""""""""""""""""
+
+Example operators below are using all connectivity options (note connection id
+from the operator matches the `AIRFLOW_CONN_*` postfix uppercase - this is
+standard AIRFLOW notation for defining connection via environment variables):
+
+.. literalinclude:: ../../airflow/contrib/example_dags/example_gcp_sql_query.py
+      :language: python
+      :start-after: [START howto_operator_cloudsql_query_operators]
+      :end-before: [END howto_operator_cloudsql_query_operators]
+
+Templating
+""""""""""
+
+.. literalinclude:: ../../airflow/contrib/operators/gcp_sql_operator.py
+    :language: python
+    :dedent: 4
+    :start-after: [START gcp_sql_query_template_fields]
+    :end-before: [END gcp_sql_query_template_fields]
+
+More information
+""""""""""""""""
+
+See `Google Cloud Sql Proxy documentation
+<https://cloud.google.com/sql/docs/postgres/sql-proxy>`_
+for details about Cloud Sql Proxy.
+
diff --git a/docs/integration.rst b/docs/integration.rst
index 46ea436bd7..1f5a601d8e 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -516,63 +516,83 @@ Cloud SQL Operators
 """""""""""""""""""
 
 - :ref:`CloudSqlInstanceDatabaseDeleteOperator` : deletes a database from a Cloud SQL
-instance.
+  instance.
 - :ref:`CloudSqlInstanceDatabaseCreateOperator` : creates a new database inside a Cloud
-SQL instance.
+  SQL instance.
 - :ref:`CloudSqlInstanceDatabasePatchOperator` : updates a database inside a Cloud
-SQL instance.
+  SQL instance.
 - :ref:`CloudSqlInstanceDeleteOperator` : delete a Cloud SQL instance.
 - :ref:`CloudSqlInstanceCreateOperator` : create a new Cloud SQL instance.
 - :ref:`CloudSqlInstancePatchOperator` : patch a Cloud SQL instance.
+- :ref:`CloudSqlQueryOperator` : run query in a Cloud SQL instance.
 
-.. CloudSqlInstanceDatabaseDeleteOperator:
+.. _CloudSqlInstanceDatabaseDeleteOperator:
 
 CloudSqlInstanceDatabaseDeleteOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseDeleteOperator
 
-.. CloudSqlInstanceDatabaseCreateOperator:
+.. _CloudSqlInstanceDatabaseCreateOperator:
 
 CloudSqlInstanceDatabaseCreateOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabaseCreateOperator
 
-.. CloudSqlInstanceDatabasePatchOperator:
+.. _CloudSqlInstanceDatabasePatchOperator:
 
 CloudSqlInstanceDatabasePatchOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDatabasePatchOperator
 
-.. CloudSqlInstanceDeleteOperator:
+.. _CloudSqlInstanceDeleteOperator:
 
 CloudSqlInstanceDeleteOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceDeleteOperator
 
-.. CloudSqlInstanceCreateOperator:
+.. _CloudSqlInstanceCreateOperator:
 
 CloudSqlInstanceCreateOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstanceCreateOperator
 
-.. CloudSqlInstancePatchOperator:
+.. _CloudSqlInstancePatchOperator:
 
 CloudSqlInstancePatchOperator
 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
 
 .. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlInstancePatchOperator
 
-Cloud SQL Hook
-""""""""""""""
+.. _CloudSqlQueryOperator:
+
+CloudSqlQueryOperator
+^^^^^^^^^^^^^^^^^^^^^
+.. autoclass:: airflow.contrib.operators.gcp_sql_operator.CloudSqlQueryOperator
+
+Cloud SQL Hooks
+"""""""""""""""
+
+.. _CloudSqlHook:
 
 .. autoclass:: airflow.contrib.hooks.gcp_sql_hook.CloudSqlHook
     :members:
 
+.. _CloudSqlDatabaseHook:
+
+.. autoclass:: airflow.contrib.hooks.gcp_sql_hook.CloudSqlDatabaseHook
+    :members:
+
+.. _CloudSqlProxyRunner:
+
+.. autoclass:: airflow.contrib.hooks.gcp_sql_hook.CloudSqlProxyRunner
+    :members:
+
+
 Compute Engine
 ''''''''''''''
 
diff --git a/tests/contrib/operators/test_gcp_base.py b/tests/contrib/operators/test_gcp_base.py
new file mode 100644
index 0000000000..60e5abeb9f
--- /dev/null
+++ b/tests/contrib/operators/test_gcp_base.py
@@ -0,0 +1,174 @@
+# -*- coding: utf-8 -*-
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+import json
+import os
+import subprocess
+import unittest
+
+from airflow import models, settings, configuration, AirflowException
+from airflow.utils.timezone import datetime
+
+DEFAULT_DATE = datetime(2015, 1, 1)
+
+KEYPATH_EXTRA = 'extra__google_cloud_platform__key_path'
+KEYFILE_DICT_EXTRA = 'extra__google_cloud_platform__keyfile_dict'
+SCOPE_EXTRA = 'extra__google_cloud_platform__scope'
+PROJECT_EXTRA = 'extra__google_cloud_platform__project'
+
+AIRFLOW_MAIN_FOLDER = os.path.realpath(os.path.join(
+    os.path.dirname(os.path.realpath(__file__)),
+    os.pardir, os.pardir, os.pardir))
+
+CONTRIB_OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
+    AIRFLOW_MAIN_FOLDER, "airflow", "contrib", "example_dags")
+
+OPERATORS_EXAMPLES_DAG_FOLDER = os.path.join(
+    AIRFLOW_MAIN_FOLDER, "airflow", "example_dags")
+
+TESTS_DAG_FOLDER = os.path.join(
+    AIRFLOW_MAIN_FOLDER, "tests", "dags")
+
+GCP_FOLDER_ENVIRONMENT_VARIABLE = "GCP_SERVICE_ACCOUNT_KEY_FOLDER"
+
+GCP_COMPUTE_KEY = 'gcp_compute.json'
+GCP_FUNCTION_KEY = 'gcp_function.json'
+GCP_CLOUDSQL_KEY = 'gcp_cloudsql.json'
+GCP_BIGTABLE_KEY = 'gcp_bigtable.json'
+GCP_SPANNER_KEY = 'gcp_spanner.json'
+
+SKIP_TEST_WARNING = """
+The test is only run when there is GCP connection available! "
+Set GCP_SERVICE_ACCOUNT_KEY_FOLDER environment variable if "
+you want to run them".
+"""
+
+
+class BaseGcpIntegrationTestCase(unittest.TestCase):
+    def __init__(self,
+                 method_name,
+                 dag_id,
+                 gcp_key,
+                 dag_name=None,
+                 example_dags_folder=CONTRIB_OPERATORS_EXAMPLES_DAG_FOLDER,
+                 project_extra=None):
+        super(BaseGcpIntegrationTestCase, self).__init__(method_name)
+        self.dag_id = dag_id
+        self.dag_name = self.dag_id + '.py' if not dag_name else dag_name
+        self.gcp_key = gcp_key
+        self.example_dags_folder = example_dags_folder
+        self.project_extra = project_extra
+        self.full_key_path = None
+
+    def _gcp_authenticate(self):
+        key_dir_path = os.environ['GCP_SERVICE_ACCOUNT_KEY_FOLDER']
+        self.full_key_path = os.path.join(key_dir_path, self.gcp_key)
+
+        if not os.path.isfile(self.full_key_path):
+            raise Exception("The key {} could not be found. Please copy it to the "
+                            "{} folder.".format(self.gcp_key, key_dir_path))
+        print("Setting the GCP key to {}".format(self.full_key_path))
+        # Checking if we can authenticate using service account credentials provided
+        retcode = subprocess.call(['gcloud', 'auth', 'activate-service-account',
+                                   '--key-file={}'.format(self.full_key_path)])
+        if retcode != 0:
+            raise AirflowException("The gcloud auth method was not successful!")
+        self.update_connection_with_key_path()
+        # Now we revoke all authentication here because we want to make sure
+        # that all works fine with the credentials retrieved from the gcp_connection
+        subprocess.call(['gcloud', 'auth', 'revoke'])
+
+    def update_connection_with_key_path(self):
+        session = settings.Session()
+        try:
+            conn = session.query(models.Connection).filter(
+                models.Connection.conn_id == 'google_cloud_default')[0]
+            extras = conn.extra_dejson
+            extras[KEYPATH_EXTRA] = self.full_key_path
+            if extras.get(KEYFILE_DICT_EXTRA):
+                del extras[KEYFILE_DICT_EXTRA]
+            extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
+            extras[PROJECT_EXTRA] = self.project_extra
+            conn.extra = json.dumps(extras)
+            session.commit()
+        except BaseException as e:
+            print('Airflow DB Session error:' + str(e.message))
+            session.rollback()
+            raise
+        finally:
+            session.close()
+
+    def update_connection_with_dictionary(self):
+        session = settings.Session()
+        try:
+            conn = session.query(models.Connection).filter(
+                models.Connection.conn_id == 'google_cloud_default')[0]
+            extras = conn.extra_dejson
+            with open(self.full_key_path, "r") as f:
+                content = json.load(f)
+            extras[KEYFILE_DICT_EXTRA] = json.dumps(content)
+            if extras.get(KEYPATH_EXTRA):
+                del extras[KEYPATH_EXTRA]
+            extras[SCOPE_EXTRA] = 'https://www.googleapis.com/auth/cloud-platform'
+            extras[PROJECT_EXTRA] = self.project_extra
+            conn.extra = json.dumps(extras)
+            session.commit()
+        except BaseException as e:
+            print('Airflow DB Session error:' + str(e.message))
+            session.rollback()
+            raise
+        finally:
+            session.close()
+
+    def _symlink_dag(self):
+        target_path = os.path.join(TESTS_DAG_FOLDER, self.dag_name)
+        if os.path.exists(target_path):
+            os.remove(target_path)
+        os.symlink(
+            os.path.join(self.example_dags_folder, self.dag_name),
+            os.path.join(target_path))
+
+    def _rm_symlink_dag(self):
+        os.remove(os.path.join(TESTS_DAG_FOLDER, self.dag_name))
+
+    def _run_dag(self):
+        dag_bag = models.DagBag(dag_folder=TESTS_DAG_FOLDER, include_examples=False)
+        self.args = {'owner': 'airflow', 'start_date': DEFAULT_DATE}
+        dag = dag_bag.get_dag(self.dag_id)
+        dag.clear(reset_dag_runs=True)
+        dag.run(ignore_first_depends_on_past=True, verbose=True)
+
+    def setUp(self):
+        configuration.conf.load_test_config()
+        self._gcp_authenticate()
+        self._symlink_dag()
+
+    def tearDown(self):
+        self._rm_symlink_dag()
+
+    @staticmethod
+    def skip_check(key):
+        if GCP_FOLDER_ENVIRONMENT_VARIABLE not in os.environ:
+            return True
+        key_folder = os.environ[GCP_FOLDER_ENVIRONMENT_VARIABLE]
+        if not os.path.isdir(key_folder):
+            return True
+        key_path = os.path.join(key_folder, key)
+        if not os.path.isfile(key_path):
+            return True
+        return False
diff --git a/tests/contrib/operators/test_gcp_compute_operator.py b/tests/contrib/operators/test_gcp_compute_operator.py
index e8f9bf0165..4a4e336b7c 100644
--- a/tests/contrib/operators/test_gcp_compute_operator.py
+++ b/tests/contrib/operators/test_gcp_compute_operator.py
@@ -340,13 +340,13 @@ def test_set_machine_type_should_throw_ex_when_missing_machine_type(self, mock_h
     MOCK_OP_RESPONSE = "{'kind': 'compute#operation', 'id': '8529919847974922736', " \
                        "'name': " \
                        "'operation-1538578207537-577542784f769-7999ab71-94f9ec1d', " \
-                       "'zone': 'https://www.googleapis.com/compute/v1/projects/polidea" \
-                       "-airflow/zones/europe-west3-b', 'operationType': " \
+                       "'zone': 'https://www.googleapis.com/compute/v1/projects/example" \
+                       "-project/zones/europe-west3-b', 'operationType': " \
                        "'setMachineType', 'targetLink': " \
-                       "'https://www.googleapis.com/compute/v1/projects/example-airflow" \
+                       "'https://www.googleapis.com/compute/v1/projects/example-project" \
                        "/zones/europe-west3-b/instances/pa-1', 'targetId': " \
                        "'2480086944131075860', 'status': 'DONE', 'user': " \
-                       "'uberdarek@example-airflow.iam.gserviceaccount.com', " \
+                       "'service-account@example-project.iam.gserviceaccount.com', " \
                        "'progress': 100, 'insertTime': '2018-10-03T07:50:07.951-07:00', "\
                        "'startTime': '2018-10-03T07:50:08.324-07:00', 'endTime': " \
                        "'2018-10-03T07:50:08.484-07:00', 'error': {'errors': [{'code': " \
@@ -354,7 +354,7 @@ def test_set_machine_type_should_throw_ex_when_missing_machine_type(self, mock_h
                        "'machine-type-1' does not exist in zone 'europe-west3-b'.\"}]}, "\
                        "'httpErrorStatusCode': 400, 'httpErrorMessage': 'BAD REQUEST', " \
                        "'selfLink': " \
-                       "'https://www.googleapis.com/compute/v1/projects/example-airflow" \
+                       "'https://www.googleapis.com/compute/v1/projects/example-project" \
                        "/zones/europe-west3-b/operations/operation-1538578207537" \
                        "-577542784f769-7999ab71-94f9ec1d'} "
 
diff --git a/tests/contrib/operators/test_gcp_sql_operator.py b/tests/contrib/operators/test_gcp_sql_operator.py
index 31ed3d37c3..516fcef4aa 100644
--- a/tests/contrib/operators/test_gcp_sql_operator.py
+++ b/tests/contrib/operators/test_gcp_sql_operator.py
@@ -16,14 +16,23 @@
 # KIND, either express or implied.  See the License for the
 # specific language governing permissions and limitations
 # under the License.
-
+import json
+import os
+import time
 import unittest
+from uuid import uuid1
+
+from parameterized import parameterized
 
 from airflow import AirflowException
+from airflow.contrib.hooks.gcp_sql_hook import CloudSqlProxyRunner
 from airflow.contrib.operators.gcp_sql_operator import CloudSqlInstanceCreateOperator, \
     CloudSqlInstancePatchOperator, CloudSqlInstanceDeleteOperator, \
     CloudSqlInstanceDatabaseCreateOperator, CloudSqlInstanceDatabasePatchOperator, \
-    CloudSqlInstanceDatabaseDeleteOperator
+    CloudSqlInstanceDatabaseDeleteOperator, CloudSqlQueryOperator
+from airflow.models import Connection
+from tests.contrib.operators.test_gcp_base import BaseGcpIntegrationTestCase, \
+    GCP_CLOUDSQL_KEY, SKIP_TEST_WARNING
 
 try:
     # noinspection PyProtectedMember
@@ -34,9 +43,10 @@
     except ImportError:
         mock = None
 
-PROJECT_ID = "project-id"
-INSTANCE_NAME = "test-name"
-DB_NAME = "db1"
+PROJECT_ID = os.environ.get('PROJECT_ID', 'project-id')
+INSTANCE_NAME = os.environ.get('INSTANCE_NAME', 'test-name')
+DB_NAME = os.environ.get('DB_NAME', 'db1')
+
 CREATE_BODY = {
     "name": INSTANCE_NAME,
     "settings": {
@@ -218,7 +228,7 @@ def test_create_should_validate_list_type(self, mock_hook):
                 "tier": "db-n1-standard-1",
                 "ipConfiguration": {
                     "authorizedNetworks": {}  # Should be a list, not a dict.
-                                              # Testing if the validation catches this.
+                    # Testing if the validation catches this.
                 }
             }
         }
@@ -242,7 +252,7 @@ def test_create_should_validate_non_empty_fields(self, mock_hook):
             "name": INSTANCE_NAME,
             "settings": {
                 "tier": "",  # Field can't be empty (defined in CLOUD_SQL_VALIDATION).
-                             # Testing if the validation catches this.
+                # Testing if the validation catches this.
             }
         }
         with self.assertRaises(AirflowException) as cm:
@@ -318,7 +328,9 @@ def test_instance_delete(self, mock_hook, _check_if_instance_exists):
                 ".CloudSqlInstanceDeleteOperator._check_if_instance_exists")
     @mock.patch("airflow.contrib.operators.gcp_sql_operator.CloudSqlHook")
     def test_instance_delete_should_abort_and_succeed_if_not_exists(
-            self, mock_hook, _check_if_instance_exists):
+            self,
+            mock_hook,
+            _check_if_instance_exists):
         _check_if_instance_exists.return_value = False
         op = CloudSqlInstanceDeleteOperator(
             project_id=PROJECT_ID,
@@ -462,3 +474,329 @@ def test_instance_db_delete_should_abort_and_succeed_if_not_exists(
         mock_hook.assert_called_once_with(api_version="v1beta4",
                                           gcp_conn_id="google_cloud_default")
         mock_hook.return_value.delete_database.assert_not_called()
+
+
+class CloudSqlQueryValidationTest(unittest.TestCase):
+    @parameterized.expand([
+        ('', 'location', 'instance_name', 'postgres', False, False,
+         'SELECT * FROM TEST',
+         "The required extra 'project_id' is empty"),
+        ('project_id', '', 'instance_name', 'mysql', False, False,
+         'SELECT * FROM TEST',
+         "The required extra 'location' is empty"),
+        ('project_id', 'location', '', 'postgres', False, False,
+         'SELECT * FROM TEST',
+         "The required extra 'instance' is empty"),
+        ('project_id', 'location', 'instance_name', 'wrong', False, False,
+         'SELECT * FROM TEST',
+         "Invalid database type 'wrong'. Must be one of ['postgres', 'mysql']"),
+        ('project_id', 'location', 'instance_name', 'postgres', True, True,
+         'SELECT * FROM TEST',
+         "Cloud Sql Proxy does not support SSL connections. SSL is not needed as"
+         " Cloud Sql Proxy provides encryption on its own"),
+        ('project_id', 'location', 'instance_name', 'postgres', False, True,
+         'SELECT * FROM TEST',
+         "SSL connections requires sslcert to be set"),
+    ])
+    @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
+    def test_create_operator_with_wrong_parameters(self,
+                                                   project_id,
+                                                   location,
+                                                   instance_name,
+                                                   database_type,
+                                                   use_proxy,
+                                                   use_ssl,
+                                                   sql,
+                                                   message,
+                                                   get_connections):
+        connection = Connection()
+        connection.parse_from_uri(
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type={database_type}&"
+            "project_id={project_id}&location={location}&instance={instance_name}&"
+            "use_proxy={use_proxy}&use_ssl={use_ssl}".
+            format(database_type=database_type,
+                   project_id=project_id,
+                   location=location,
+                   instance_name=instance_name,
+                   use_proxy=use_proxy,
+                   use_ssl=use_ssl))
+        get_connections.return_value = [connection]
+        with self.assertRaises(AirflowException) as cm:
+            CloudSqlQueryOperator(
+                sql=sql,
+                task_id='task_id'
+            )
+        err = cm.exception
+        self.assertIn(message, str(err))
+
+    @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
+    def test_create_operator_with_correct_parameters_postgres(self, get_connections):
+        connection = Connection()
+        connection.parse_from_uri(
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
+            "project_id=example-project&location=europe-west1&instance=testdb&"
+            "use_proxy=False&use_ssl=False")
+        get_connections.return_value = [connection]
+        operator = CloudSqlQueryOperator(
+            sql=['SELECT * FROM TABLE'],
+            task_id='task_id'
+        )
+        operator.cloudsql_db_hook.create_connection()
+        try:
+            db_hook = operator.cloudsql_db_hook.get_database_hook()
+            conn = db_hook._get_connections_from_db(db_hook.postgres_conn_id)[0]
+        finally:
+            operator.cloudsql_db_hook.delete_connection()
+        self.assertEqual('postgres', conn.conn_type)
+        self.assertEqual('8.8.8.8', conn.host)
+        self.assertEqual(3200, conn.port)
+        self.assertEqual('testdb', conn.schema)
+
+    @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
+    def test_create_operator_with_correct_parameters_postgres_ssl(self, get_connections):
+        connection = Connection()
+        connection.parse_from_uri(
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
+            "project_id=example-project&location=europe-west1&instance=testdb&"
+            "use_proxy=False&use_ssl=True&sslcert=/bin/bash&"
+            "sslkey=/bin/bash&sslrootcert=/bin/bash")
+        get_connections.return_value = [connection]
+        operator = CloudSqlQueryOperator(
+            sql=['SELECT * FROM TABLE'],
+            task_id='task_id'
+        )
+        operator.cloudsql_db_hook.create_connection()
+        try:
+            db_hook = operator.cloudsql_db_hook.get_database_hook()
+            conn = db_hook._get_connections_from_db(db_hook.postgres_conn_id)[0]
+        finally:
+            operator.cloudsql_db_hook.delete_connection()
+        self.assertEqual('postgres', conn.conn_type)
+        self.assertEqual('8.8.8.8', conn.host)
+        self.assertEqual(3200, conn.port)
+        self.assertEqual('testdb', conn.schema)
+        self.assertEqual('/bin/bash', conn.extra_dejson['sslkey'])
+        self.assertEqual('/bin/bash', conn.extra_dejson['sslcert'])
+        self.assertEqual('/bin/bash', conn.extra_dejson['sslrootcert'])
+
+    @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
+    def test_create_operator_with_correct_parameters_postgres_proxy_socket(
+            self, get_connections):
+        connection = Connection()
+        connection.parse_from_uri(
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
+            "project_id=example-project&location=europe-west1&instance=testdb&"
+            "use_proxy=True&sql_proxy_use_tcp=False")
+        get_connections.return_value = [connection]
+        operator = CloudSqlQueryOperator(
+            sql=['SELECT * FROM TABLE'],
+            task_id='task_id'
+        )
+        operator.cloudsql_db_hook.create_connection()
+        try:
+            db_hook = operator.cloudsql_db_hook.get_database_hook()
+            conn = db_hook._get_connections_from_db(db_hook.postgres_conn_id)[0]
+        finally:
+            operator.cloudsql_db_hook.delete_connection()
+        self.assertEqual('postgres', conn.conn_type)
+        self.assertIn('/tmp', conn.host)
+        self.assertIn('example-project:europe-west1:testdb', conn.host)
+        self.assertIsNone(conn.port)
+        self.assertEqual('testdb', conn.schema)
+
+    @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
+    def test_create_operator_with_correct_parameters_postgres_proxy_tcp(self,
+                                                                        get_connections):
+        connection = Connection()
+        connection.parse_from_uri(
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=postgres&"
+            "project_id=example-project&location=europe-west1&instance=testdb&"
+            "use_proxy=True&sql_proxy_use_tcp=True")
+        get_connections.return_value = [connection]
+        operator = CloudSqlQueryOperator(
+            sql=['SELECT * FROM TABLE'],
+            task_id='task_id'
+        )
+        operator.cloudsql_db_hook.create_connection()
+        try:
+            db_hook = operator.cloudsql_db_hook.get_database_hook()
+            conn = db_hook._get_connections_from_db(db_hook.postgres_conn_id)[0]
+        finally:
+            operator.cloudsql_db_hook.delete_connection()
+        self.assertEqual('postgres', conn.conn_type)
+        self.assertEqual('127.0.0.1', conn.host)
+        self.assertNotEqual(3200, conn.port)
+        self.assertEqual('testdb', conn.schema)
+
+    @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
+    def test_create_operator_with_correct_parameters_mysql(self, get_connections):
+        connection = Connection()
+        connection.parse_from_uri(
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&"
+            "project_id=example-project&location=europe-west1&instance=testdb&"
+            "use_proxy=False&use_ssl=False")
+        get_connections.return_value = [connection]
+        operator = CloudSqlQueryOperator(
+            sql=['SELECT * FROM TABLE'],
+            task_id='task_id'
+        )
+        operator.cloudsql_db_hook.create_connection()
+        try:
+            db_hook = operator.cloudsql_db_hook.get_database_hook()
+            conn = db_hook._get_connections_from_db(db_hook.mysql_conn_id)[0]
+        finally:
+            operator.cloudsql_db_hook.delete_connection()
+        self.assertEqual('mysql', conn.conn_type)
+        self.assertEqual('8.8.8.8', conn.host)
+        self.assertEqual(3200, conn.port)
+        self.assertEqual('testdb', conn.schema)
+
+    @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
+    def test_create_operator_with_correct_parameters_mysql_ssl(self, get_connections):
+        connection = Connection()
+        connection.parse_from_uri(
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&"
+            "project_id=example-project&location=europe-west1&instance=testdb&"
+            "use_proxy=False&use_ssl=True&sslcert=/bin/bash&"
+            "sslkey=/bin/bash&sslrootcert=/bin/bash")
+        get_connections.return_value = [connection]
+        operator = CloudSqlQueryOperator(
+            sql=['SELECT * FROM TABLE'],
+            task_id='task_id'
+        )
+        operator.cloudsql_db_hook.create_connection()
+        try:
+            db_hook = operator.cloudsql_db_hook.get_database_hook()
+            conn = db_hook._get_connections_from_db(db_hook.mysql_conn_id)[0]
+        finally:
+            operator.cloudsql_db_hook.delete_connection()
+        self.assertEqual('mysql', conn.conn_type)
+        self.assertEqual('8.8.8.8', conn.host)
+        self.assertEqual(3200, conn.port)
+        self.assertEqual('testdb', conn.schema)
+        self.assertEqual('/bin/bash', json.loads(conn.extra_dejson['ssl'])['cert'])
+        self.assertEqual('/bin/bash', json.loads(conn.extra_dejson['ssl'])['key'])
+        self.assertEqual('/bin/bash', json.loads(conn.extra_dejson['ssl'])['ca'])
+
+    @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
+    def test_create_operator_with_correct_parameters_mysql_proxy_socket(self,
+                                                                        get_connections):
+        connection = Connection()
+        connection.parse_from_uri(
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&"
+            "project_id=example-project&location=europe-west1&instance=testdb&"
+            "use_proxy=True&sql_proxy_use_tcp=False")
+        get_connections.return_value = [connection]
+        operator = CloudSqlQueryOperator(
+            sql=['SELECT * FROM TABLE'],
+            task_id='task_id'
+        )
+        operator.cloudsql_db_hook.create_connection()
+        try:
+            db_hook = operator.cloudsql_db_hook.get_database_hook()
+            conn = db_hook._get_connections_from_db(db_hook.mysql_conn_id)[0]
+        finally:
+            operator.cloudsql_db_hook.delete_connection()
+        self.assertEqual('mysql', conn.conn_type)
+        self.assertEqual('localhost', conn.host)
+        self.assertIn('/tmp', conn.extra_dejson['unix_socket'])
+        self.assertIn('example-project:europe-west1:testdb',
+                      conn.extra_dejson['unix_socket'])
+        self.assertIsNone(conn.port)
+        self.assertEqual('testdb', conn.schema)
+
+    @mock.patch("airflow.hooks.base_hook.BaseHook.get_connections")
+    def test_create_operator_with_correct_parameters_mysql_tcp(self, get_connections):
+        connection = Connection()
+        connection.parse_from_uri(
+            "gcpcloudsql://user:password@8.8.8.8:3200/testdb?database_type=mysql&"
+            "project_id=example-project&location=europe-west1&instance=testdb&"
+            "use_proxy=True&sql_proxy_use_tcp=True")
+        get_connections.return_value = [connection]
+        operator = CloudSqlQueryOperator(
+            sql=['SELECT * FROM TABLE'],
+            task_id='task_id'
+        )
+        operator.cloudsql_db_hook.create_connection()
+        try:
+            db_hook = operator.cloudsql_db_hook.get_database_hook()
+            conn = db_hook._get_connections_from_db(db_hook.mysql_conn_id)[0]
+        finally:
+            operator.cloudsql_db_hook.delete_connection()
+        self.assertEqual('mysql', conn.conn_type)
+        self.assertEqual('127.0.0.1', conn.host)
+        self.assertNotEqual(3200, conn.port)
+        self.assertEqual('testdb', conn.schema)
+
+
+@unittest.skipIf(
+    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
+class CloudSqlProxyIntegrationTest(BaseGcpIntegrationTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudSqlProxyIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_sql_query',
+            gcp_key='gcp_cloudsql.json')
+
+    def test_start_proxy_fail_no_parameters(self):
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
+                                     project_id=PROJECT_ID,
+                                     instance_specification='a')
+        with self.assertRaises(AirflowException) as cm:
+            runner.start_proxy()
+        err = cm.exception
+        self.assertIn("invalid instance name", str(err))
+        with self.assertRaises(AirflowException) as cm:
+            runner.start_proxy()
+        err = cm.exception
+        self.assertIn("invalid instance name", str(err))
+        self.assertIsNone(runner.sql_proxy_process)
+
+    def test_start_proxy_with_all_instances(self):
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
+                                     project_id=PROJECT_ID,
+                                     instance_specification='')
+        try:
+            runner.start_proxy()
+            time.sleep(1)
+        finally:
+            runner.stop_proxy()
+        self.assertIsNone(runner.sql_proxy_process)
+
+    def test_start_proxy_with_all_instances_generated_credential_file(self):
+        self.update_connection_with_dictionary()
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
+                                     project_id=PROJECT_ID,
+                                     instance_specification='')
+        try:
+            runner.start_proxy()
+            time.sleep(1)
+        finally:
+            runner.stop_proxy()
+        self.assertIsNone(runner.sql_proxy_process)
+
+    def test_start_proxy_with_all_instances_specific_version(self):
+        runner = CloudSqlProxyRunner(path_prefix='/tmp/' + str(uuid1()),
+                                     project_id=PROJECT_ID,
+                                     instance_specification='',
+                                     sql_proxy_version='v1.13')
+        try:
+            runner.start_proxy()
+            time.sleep(1)
+        finally:
+            runner.stop_proxy()
+        self.assertIsNone(runner.sql_proxy_process)
+        self.assertEqual(runner.get_proxy_version(), "1.13")
+
+
+@unittest.skipIf(
+    BaseGcpIntegrationTestCase.skip_check(GCP_CLOUDSQL_KEY), SKIP_TEST_WARNING)
+class CloudSqlQueryExampleDagsIntegrationTest(BaseGcpIntegrationTestCase):
+    def __init__(self, method_name='runTest'):
+        super(CloudSqlQueryExampleDagsIntegrationTest, self).__init__(
+            method_name,
+            dag_id='example_gcp_sql_query',
+            gcp_key=GCP_CLOUDSQL_KEY)
+
+    def test_run_example_dag_cloudsql_query(self):
+        self._run_dag()
diff --git a/tests/dags/.gitignore b/tests/dags/.gitignore
new file mode 100644
index 0000000000..beddda487f
--- /dev/null
+++ b/tests/dags/.gitignore
@@ -0,0 +1,5 @@
+# This line is to avoid accidental commits of example dags for integration testing
+# In order to test example dags easily we often create symbolic links in this directory
+# and run the Airflow with AIRFLOW__CORE__UNIT_TEST_MODE=True
+# this line prevents accidental commiting of such symbolic links
+example_*


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Google Cloud SQL Query operator
> -------------------------------
>
>                 Key: AIRFLOW-3275
>                 URL: https://issues.apache.org/jira/browse/AIRFLOW-3275
>             Project: Apache Airflow
>          Issue Type: New Feature
>          Components: contrib
>            Reporter: Jarek Potiuk
>            Priority: Major
>
> Operator that performs a DDL or DML SQL query in Google Cloud SQL instance. The DQL (retrieving data from Google Cloud SQL) is not supported - you might run the SELECT queries but results of those queries are discarded.
> You should be able specify various connectivity methods to connect to running instance - starting from Public IP plain connection through Public IP with SSL or both TCP and socket connection via Cloud SQL Proxy. The proxy should be downloaded and started/stopped dynamically as needed by the operator.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Mime
View raw message