airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1829] Support for schema updates in query jobs
Date Mon, 11 Dec 2017 22:37:51 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 30076f1e4 -> 8ba86072f


[AIRFLOW-1829] Support for schema updates in query jobs

Closes #2796 from wileeam/bq-operator-query-
schema-update-support


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8ba86072
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8ba86072
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8ba86072

Branch: refs/heads/master
Commit: 8ba86072f9c5ef81933cd6546e7e2f000f862053
Parents: 30076f1
Author: Guillermo Rodríguez Cano <wschutz@gmail.com>
Authored: Mon Dec 11 14:36:35 2017 -0800
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Mon Dec 11 14:36:46 2017 -0800

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py         | 434 ++++++++++++--------
 airflow/contrib/operators/bigquery_operator.py |  41 +-
 2 files changed, 281 insertions(+), 194 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ba86072/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 3f9e8af..3b4ce0f 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -12,27 +12,28 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 """
 This module contains a BigQuery Hook, as well as a very basic PEP 249
 implementation for BigQuery.
 """
 
 import time
-
-from apiclient.discovery import build, HttpError
-from googleapiclient import errors
 from builtins import range
-from pandas_gbq.gbq import GbqConnector, \
-    _parse_data as gbq_parse_data, \
-    _check_google_client_version as gbq_check_google_client_version, \
-    _test_google_api_imports as gbq_test_google_api_imports
-from pandas.tools.merge import concat
+
 from past.builtins import basestring
 
 from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
 from airflow.hooks.dbapi_hook import DbApiHook
 from airflow.utils.log.logging_mixin import LoggingMixin
+from apiclient.discovery import HttpError, build
+from googleapiclient import errors
+from pandas.tools.merge import concat
+from pandas_gbq.gbq import \
+    _check_google_client_version as gbq_check_google_client_version
+from pandas_gbq.gbq import _parse_data as gbq_parse_data
+from pandas_gbq.gbq import \
+    _test_google_api_imports as gbq_test_google_api_imports
+from pandas_gbq.gbq import GbqConnector
 
 
 class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
@@ -42,12 +43,9 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
     """
     conn_name_attr = 'bigquery_conn_id'
 
-    def __init__(self,
-                 bigquery_conn_id='bigquery_default',
-                 delegate_to=None):
+    def __init__(self, bigquery_conn_id='bigquery_default', delegate_to=None):
         super(BigQueryHook, self).__init__(
-            conn_id=bigquery_conn_id,
-            delegate_to=delegate_to)
+            conn_id=bigquery_conn_id, delegate_to=delegate_to)
 
     def get_conn(self):
         """
@@ -83,7 +81,8 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
 
         :param bql: The BigQuery SQL to execute.
         :type bql: string
-        :param parameters: The parameters to render the SQL query with (not used, leave to
override superclass method)
+        :param parameters: The parameters to render the SQL query with (not
+            used, leave to override superclass method)
         :type parameters: mapping or iterable
         :param dialect: Dialect of BigQuery SQL – legacy SQL or standard SQL
         :type dialect: string in {'legacy', 'standard'}, default 'legacy'
@@ -107,11 +106,12 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
         """
         Checks for the existence of a table in Google BigQuery.
 
-        :param project_id: The Google cloud project in which to look for the table. The connection
supplied to the hook
-        must provide access to the specified project.
+        :param project_id: The Google cloud project in which to look for the
+            table. The connection supplied to the hook must provide access to
+            the specified project.
         :type project_id: string
-        :param dataset_id: The name of the dataset in which to look for the table.
-            storage bucket.
+        :param dataset_id: The name of the dataset in which to look for the
+            table.
         :type dataset_id: string
         :param table_id: The name of the table to check the existence of.
         :type table_id: string
@@ -119,10 +119,8 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin):
         service = self.get_service()
         try:
             service.tables().get(
-                projectId=project_id,
-                datasetId=dataset_id,
-                tableId=table_id
-            ).execute()
+                projectId=project_id, datasetId=dataset_id,
+                tableId=table_id).execute()
             return True
         except errors.HttpError as e:
             if e.resp['status'] == '404':
@@ -138,7 +136,13 @@ class BigQueryPandasConnector(GbqConnector):
     without forcing a three legged OAuth connection. Instead, we can inject
     service account credentials into the binding.
     """
-    def __init__(self, project_id, service, reauth=False, verbose=False, dialect='legacy'):
+
+    def __init__(self,
+                 project_id,
+                 service,
+                 reauth=False,
+                 verbose=False,
+                 dialect='legacy'):
         gbq_check_google_client_version()
         gbq_test_google_api_imports()
         self.project_id = project_id
@@ -182,20 +186,23 @@ class BigQueryBaseCursor(LoggingMixin):
     BigQuery. The methods can be used directly by operators, in cases where a
     PEP 249 cursor isn't needed.
     """
+
     def __init__(self, service, project_id):
         self.service = service
         self.project_id = project_id
         self.running_job_id = None
 
-    def run_query(
-            self, bql, destination_dataset_table = False,
-            write_disposition = 'WRITE_EMPTY',
-            allow_large_results=False,
-            udf_config = False,
-            use_legacy_sql=True,
-            maximum_billing_tier=None,
-            create_disposition='CREATE_IF_NEEDED',
-            query_params=None):
+    def run_query(self,
+                  bql,
+                  destination_dataset_table=False,
+                  write_disposition='WRITE_EMPTY',
+                  allow_large_results=False,
+                  udf_config=False,
+                  use_legacy_sql=True,
+                  maximum_billing_tier=None,
+                  create_disposition='CREATE_IF_NEEDED',
+                  query_params=None,
+                  schema_update_options=()):
         """
         Executes a BigQuery SQL query. Optionally persists results in a BigQuery
         table. See here:
@@ -211,8 +218,6 @@ class BigQueryBaseCursor(LoggingMixin):
         :param write_disposition: What to do if the table already exists in
             BigQuery.
         :type write_disposition: string
-        :param create_disposition: Specifies whether the job is allowed to create new tables.
-        :type create_disposition: string
         :param allow_large_results: Whether to allow large results.
         :type allow_large_results: boolean
         :param udf_config: The User Defined Function configuration for the query.
@@ -220,9 +225,34 @@ class BigQueryBaseCursor(LoggingMixin):
         :type udf_config: list
         :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false).
         :type use_legacy_sql: boolean
-        :param maximum_billing_tier: Positive integer that serves as a multiplier of the
basic price.
+        :param maximum_billing_tier: Positive integer that serves as a
+            multiplier of the basic price.
         :type maximum_billing_tier: integer
+        :param create_disposition: Specifies whether the job is allowed to
+            create new tables.
+        :type create_disposition: string
+        :param query_params a dictionary containing query parameter types and
+            values, passed to BigQuery
+        :type query_params: dict
+        :param schema_update_options: Allows the schema of the desitination
+            table to be updated as a side effect of the query job.
+        :type schema_update_options: tuple
         """
+
+        # BigQuery also allows you to define how you want a table's schema to change
+        # as a side effect of a query job
+        # for more details:
+        #   https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.schemaUpdateOptions
+        allowed_schema_update_options = [
+            'ALLOW_FIELD_ADDITION', "ALLOW_FIELD_RELAXATION"
+        ]
+        if not set(allowed_schema_update_options).issuperset(
+                set(schema_update_options)):
+            raise ValueError(
+                "{0} contains invalid schema update options. "
+                "Please only use one or more of the following options: {1}"
+                .format(schema_update_options, allowed_schema_update_options))
+
         configuration = {
             'query': {
                 'query': bql,
@@ -239,9 +269,12 @@ class BigQueryBaseCursor(LoggingMixin):
                 _split_tablename(table_input=destination_dataset_table,
                                  default_project_id=self.project_id)
             configuration['query'].update({
-                'allowLargeResults': allow_large_results,
-                'writeDisposition': write_disposition,
-                'createDisposition': create_disposition,
+                'allowLargeResults':
+                allow_large_results,
+                'writeDisposition':
+                write_disposition,
+                'createDisposition':
+                create_disposition,
                 'destinationTable': {
                     'projectId': destination_project,
                     'datasetId': destination_dataset,
@@ -251,17 +284,38 @@ class BigQueryBaseCursor(LoggingMixin):
         if udf_config:
             assert isinstance(udf_config, list)
             configuration['query'].update({
-                'userDefinedFunctionResources': udf_config
+                'userDefinedFunctionResources':
+                udf_config
             })
 
         if query_params:
-            configuration['query']['queryParameters'] = query_params
+            if use_legacy_sql:
+                raise ValueError("Query paramaters are not allowed when using "
+                                 "legacy SQL")
+            else:
+                configuration['query']['queryParameters'] = query_params
+
+        if schema_update_options:
+            if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]:
+                raise ValueError("schema_update_options is only "
+                                 "allowed if write_disposition is "
+                                 "'WRITE_APPEND' or 'WRITE_TRUNCATE'.")
+            else:
+                self.log.info(
+                    "Adding experimental "
+                    "'schemaUpdateOptions': {0}".format(schema_update_options))
+                configuration['query'][
+                    'schemaUpdateOptions'] = schema_update_options
 
         return self.run_with_configuration(configuration)
 
     def run_extract(  # noqa
-            self, source_project_dataset_table, destination_cloud_storage_uris,
-            compression='NONE', export_format='CSV', field_delimiter=',',
+            self,
+            source_project_dataset_table,
+            destination_cloud_storage_uris,
+            compression='NONE',
+            export_format='CSV',
+            field_delimiter=',',
             print_header=True):
         """
         Executes a BigQuery extract command to copy data from BigQuery to
@@ -344,10 +398,10 @@ class BigQueryBaseCursor(LoggingMixin):
         :param create_disposition: The create disposition if the table doesn't exist.
         :type create_disposition: string
         """
-        source_project_dataset_tables = (
-            [source_project_dataset_tables]
-            if not isinstance(source_project_dataset_tables, list)
-            else source_project_dataset_tables)
+        source_project_dataset_tables = ([
+            source_project_dataset_tables
+        ] if not isinstance(source_project_dataset_tables, list) else
+            source_project_dataset_tables)
 
         source_project_dataset_tables_fixup = []
         for source_project_dataset_table in source_project_dataset_tables:
@@ -356,9 +410,12 @@ class BigQueryBaseCursor(LoggingMixin):
                                  default_project_id=self.project_id,
                                  var_name='source_project_dataset_table')
             source_project_dataset_tables_fixup.append({
-                'projectId': source_project,
-                'datasetId': source_dataset,
-                'tableId': source_table
+                'projectId':
+                source_project,
+                'datasetId':
+                source_dataset,
+                'tableId':
+                source_table
             })
 
         destination_project, destination_dataset, destination_table = \
@@ -381,7 +438,8 @@ class BigQueryBaseCursor(LoggingMixin):
 
     def run_load(self,
                  destination_project_dataset_table,
-                 schema_fields, source_uris,
+                 schema_fields,
+                 source_uris,
                  source_format='CSV',
                  create_disposition='CREATE_IF_NEEDED',
                  skip_leading_rows=0,
@@ -437,7 +495,7 @@ class BigQueryBaseCursor(LoggingMixin):
         :type allow_jagged_rows: bool
         :param schema_update_options: Allows the schema of the desitination
             table to be updated as a side effect of the load job.
-        :type schema_update_options: list
+        :type schema_update_options: tuple
         :param src_fmt_configs: configure optional fields specific to the source format
         :type src_fmt_configs: dict
         """
@@ -448,26 +506,28 @@ class BigQueryBaseCursor(LoggingMixin):
         # Refer to this link for more details:
         #   https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat
         source_format = source_format.upper()
-        allowed_formats = ["CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS", "DATASTORE_BACKUP"]
+        allowed_formats = [
+            "CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS",
+            "DATASTORE_BACKUP"
+        ]
         if source_format not in allowed_formats:
             raise ValueError("{0} is not a valid source format. "
-                    "Please use one of the following types: {1}"
-                    .format(source_format, allowed_formats))
+                             "Please use one of the following types: {1}"
+                             .format(source_format, allowed_formats))
 
         # bigquery also allows you to define how you want a table's schema to change
         # as a side effect of a load
         # for more details:
         #   https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions
         allowed_schema_update_options = [
-            'ALLOW_FIELD_ADDITION',
-            "ALLOW_FIELD_RELAXATION"
+            'ALLOW_FIELD_ADDITION', "ALLOW_FIELD_RELAXATION"
         ]
-        if not set(allowed_schema_update_options).issuperset(set(schema_update_options)):
+        if not set(allowed_schema_update_options).issuperset(
+                set(schema_update_options)):
             raise ValueError(
                 "{0} contains invalid schema update options. "
                 "Please only use one or more of the following options: {1}"
-                .format(schema_update_options, allowed_schema_update_options)
-            )
+                .format(schema_update_options, allowed_schema_update_options))
 
         destination_project, destination_dataset, destination_table = \
             _split_tablename(table_input=destination_project_dataset_table,
@@ -488,23 +548,19 @@ class BigQueryBaseCursor(LoggingMixin):
             }
         }
         if schema_fields:
-            configuration['load']['schema'] = {
-                'fields': schema_fields
-            }
+            configuration['load']['schema'] = {'fields': schema_fields}
 
         if schema_update_options:
             if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]:
-                raise ValueError(
-                    "schema_update_options is only "
-                    "allowed if write_disposition is "
-                    "'WRITE_APPEND' or 'WRITE_TRUNCATE'."
-                )
+                raise ValueError("schema_update_options is only "
+                                 "allowed if write_disposition is "
+                                 "'WRITE_APPEND' or 'WRITE_TRUNCATE'.")
             else:
                 self.log.info(
                     "Adding experimental "
-                    "'schemaUpdateOptions': {0}".format(schema_update_options)
-                )
-                configuration['load']['schemaUpdateOptions'] = schema_update_options
+                    "'schemaUpdateOptions': {0}".format(schema_update_options))
+                configuration['load'][
+                    'schemaUpdateOptions'] = schema_update_options
 
         if max_bad_records:
             configuration['load']['maxBadRecords'] = max_bad_records
@@ -521,16 +577,20 @@ class BigQueryBaseCursor(LoggingMixin):
             src_fmt_configs['allowQuotedNewlines'] = allow_quoted_newlines
 
         src_fmt_to_configs_mapping = {
-            'CSV': ['allowJaggedRows', 'allowQuotedNewlines', 'autodetect',
-                    'fieldDelimiter', 'skipLeadingRows', 'ignoreUnknownValues',
-                    'nullMarker', 'quote'],
+            'CSV': [
+                'allowJaggedRows', 'allowQuotedNewlines', 'autodetect',
+                'fieldDelimiter', 'skipLeadingRows', 'ignoreUnknownValues',
+                'nullMarker', 'quote'
+            ],
             'DATASTORE_BACKUP': ['projectionFields'],
             'NEWLINE_DELIMITED_JSON': ['autodetect', 'ignoreUnknownValues'],
             'AVRO': [],
         }
         valid_configs = src_fmt_to_configs_mapping[source_format]
-        src_fmt_configs = {k: v for k, v in src_fmt_configs.items()
-                           if k in valid_configs}
+        src_fmt_configs = {
+            k: v
+            for k, v in src_fmt_configs.items() if k in valid_configs
+        }
         configuration['load'].update(src_fmt_configs)
 
         if allow_jagged_rows:
@@ -552,9 +612,7 @@ class BigQueryBaseCursor(LoggingMixin):
             details.
         """
         jobs = self.service.jobs()
-        job_data = {
-            'configuration': configuration
-        }
+        job_data = {'configuration': configuration}
 
         # Send query and wait for reply.
         query_reply = jobs \
@@ -566,30 +624,34 @@ class BigQueryBaseCursor(LoggingMixin):
         keep_polling_job = True
         while (keep_polling_job):
             try:
-                job = jobs.get(projectId=self.project_id, jobId=self.running_job_id).execute()
+                job = jobs.get(
+                    projectId=self.project_id,
+                    jobId=self.running_job_id).execute()
                 if (job['status']['state'] == 'DONE'):
                     keep_polling_job = False
                     # Check if job had errors.
                     if 'errorResult' in job['status']:
                         raise Exception(
-                            'BigQuery job failed. Final error was: {}. The job was: {}'.format(
-                                job['status']['errorResult'], job
-                            )
-                        )
+                            'BigQuery job failed. Final error was: {}. The job was: {}'.
+                            format(job['status']['errorResult'], job))
                 else:
-                    self.log.info('Waiting for job to complete : %s, %s', self.project_id,
self.running_job_id)
+                    self.log.info('Waiting for job to complete : %s, %s',
+                                  self.project_id, self.running_job_id)
                     time.sleep(5)
 
             except HttpError as err:
                 if err.resp.status in [500, 503]:
-                    self.log.info('%s: Retryable error, waiting for job to complete: %s',
err.resp.status, self.running_job_id)
+                    self.log.info(
+                        '%s: Retryable error, waiting for job to complete: %s',
+                        err.resp.status, self.running_job_id)
                     time.sleep(5)
                 else:
                     raise Exception(
-                        'BigQuery job status check failed. Final error was: %s', err.resp.status)
+                        'BigQuery job status check failed. Final error was: %s',
+                        err.resp.status)
 
         return self.running_job_id
-        
+
     def poll_job_complete(self, job_id):
         jobs = self.service.jobs()
         try:
@@ -598,39 +660,50 @@ class BigQueryBaseCursor(LoggingMixin):
                 return True
         except HttpError as err:
             if err.resp.status in [500, 503]:
-                self.log.info('%s: Retryable error while polling job with id %s', err.resp.status,
job_id)
+                self.log.info(
+                    '%s: Retryable error while polling job with id %s',
+                    err.resp.status, job_id)
             else:
                 raise Exception(
-                    'BigQuery job status check failed. Final error was: %s', err.resp.status)
+                    'BigQuery job status check failed. Final error was: %s',
+                    err.resp.status)
         return False
-      
-        
+
     def cancel_query(self):
         """
         Cancel all started queries that have not yet completed
         """
         jobs = self.service.jobs()
-        if (self.running_job_id and not self.poll_job_complete(self.running_job_id)):
-            self.log.info('Attempting to cancel job : %s, %s', self.project_id, self.running_job_id)
-            jobs.cancel(projectId=self.project_id, jobId=self.running_job_id).execute()
+        if (self.running_job_id and
+                not self.poll_job_complete(self.running_job_id)):
+            self.log.info('Attempting to cancel job : %s, %s', self.project_id,
+                          self.running_job_id)
+            jobs.cancel(
+                projectId=self.project_id,
+                jobId=self.running_job_id).execute()
         else:
             self.log.info('No running BigQuery jobs to cancel.')
             return
-        
+
         # Wait for all the calls to cancel to finish
         max_polling_attempts = 12
         polling_attempts = 0
-        
+
         job_complete = False
         while (polling_attempts < max_polling_attempts and not job_complete):
-            polling_attempts = polling_attempts+1
+            polling_attempts = polling_attempts + 1
             job_complete = self.poll_job_complete(self.running_job_id)
             if (job_complete):
-                self.log.info('Job successfully canceled: %s, %s', self.project_id, self.running_job_id)
-            elif(polling_attempts == max_polling_attempts):
-                self.log.info('Stopping polling due to timeout. Job with id %s has not completed
cancel and may or may not finish.', self.running_job_id)
+                self.log.info('Job successfully canceled: %s, %s',
+                              self.project_id, self.running_job_id)
+            elif (polling_attempts == max_polling_attempts):
+                self.log.info(
+                    "Stopping polling due to timeout. Job with id %s "
+                    "has not completed cancel and may or may not finish.",
+                    self.running_job_id)
             else:
-                self.log.info('Waiting for canceled job with id %s to finish.', self.running_job_id)
+                self.log.info('Waiting for canceled job with id %s to finish.',
+                              self.running_job_id)
                 time.sleep(5)
 
     def get_schema(self, dataset_id, table_id):
@@ -647,8 +720,12 @@ class BigQueryBaseCursor(LoggingMixin):
             .execute()
         return tables_resource['schema']
 
-    def get_tabledata(self, dataset_id, table_id,
-                      max_results=None, page_token=None, start_index=None):
+    def get_tabledata(self,
+                      dataset_id,
+                      table_id,
+                      max_results=None,
+                      page_token=None,
+                      start_index=None):
         """
         Get the data of a given dataset.table.
         see https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list
@@ -668,15 +745,14 @@ class BigQueryBaseCursor(LoggingMixin):
             optional_params['pageToken'] = page_token
         if start_index:
             optional_params['startIndex'] = start_index
-        return (
-            self.service.tabledata()
-            .list(
-                projectId=self.project_id, datasetId=dataset_id,
-                tableId=table_id, **optional_params)
-            .execute()
-        )
-
-    def run_table_delete(self, deletion_dataset_table, ignore_if_missing=False):
+        return (self.service.tabledata().list(
+            projectId=self.project_id,
+            datasetId=dataset_id,
+            tableId=table_id,
+            **optional_params).execute())
+
+    def run_table_delete(self, deletion_dataset_table,
+                         ignore_if_missing=False):
         """
         Delete an existing table from the dataset;
         If the table does not exist, return an error unless ignore_if_missing
@@ -705,16 +781,14 @@ class BigQueryBaseCursor(LoggingMixin):
                         datasetId=deletion_dataset,
                         tableId=deletion_table) \
                 .execute()
-            self.log.info('Deleted table %s:%s.%s.',
-                          deletion_project, deletion_dataset, deletion_table)
+            self.log.info('Deleted table %s:%s.%s.', deletion_project,
+                          deletion_dataset, deletion_table)
         except HttpError:
             if not ignore_if_missing:
-                raise Exception(
-                    'Table deletion failed. Table does not exist.')
+                raise Exception('Table deletion failed. Table does not exist.')
             else:
                 self.log.info('Table does not exist. Skipping.')
 
-
     def run_table_upsert(self, dataset_id, table_resource, project_id=None):
         """
         creates a new, empty table in the dataset;
@@ -734,20 +808,19 @@ class BigQueryBaseCursor(LoggingMixin):
         # check to see if the table exists
         table_id = table_resource['tableReference']['tableId']
         project_id = project_id if project_id is not None else self.project_id
-        tables_list_resp = self.service.tables().list(projectId=project_id,
-                                                      datasetId=dataset_id).execute()
+        tables_list_resp = self.service.tables().list(
+            projectId=project_id, datasetId=dataset_id).execute()
         while True:
             for table in tables_list_resp.get('tables', []):
                 if table['tableReference']['tableId'] == table_id:
                     # found the table, do update
-                    self.log.info(
-                        'Table %s:%s.%s exists, updating.',
-                        project_id, dataset_id, table_id
-                    )
-                    return self.service.tables().update(projectId=project_id,
-                                                        datasetId=dataset_id,
-                                                        tableId=table_id,
-                                                        body=table_resource).execute()
+                    self.log.info('Table %s:%s.%s exists, updating.',
+                                  project_id, dataset_id, table_id)
+                    return self.service.tables().update(
+                        projectId=project_id,
+                        datasetId=dataset_id,
+                        tableId=table_id,
+                        body=table_resource).execute()
             # If there is a next page, we need to check the next page.
             if 'nextPageToken' in tables_list_resp:
                 tables_list_resp = self.service.tables()\
@@ -758,20 +831,19 @@ class BigQueryBaseCursor(LoggingMixin):
             # If there is no next page, then the table doesn't exist.
             else:
                 # do insert
-                self.log.info(
-                    'Table %s:%s.%s does not exist. creating.',
-                    project_id, dataset_id, table_id
-                )
-                return self.service.tables().insert(projectId=project_id,
-                                                    datasetId=dataset_id,
-                                                    body=table_resource).execute()
+                self.log.info('Table %s:%s.%s does not exist. creating.',
+                              project_id, dataset_id, table_id)
+                return self.service.tables().insert(
+                    projectId=project_id,
+                    datasetId=dataset_id,
+                    body=table_resource).execute()
 
     def run_grant_dataset_view_access(self,
                                       source_dataset,
                                       view_dataset,
                                       view_table,
-                                      source_project = None,
-                                      view_project = None):
+                                      source_project=None,
+                                      view_project=None):
         """
         Grant authorized view access of a dataset to a view table.
         If this view has already been granted access to the dataset, do nothing.
@@ -798,28 +870,36 @@ class BigQueryBaseCursor(LoggingMixin):
 
         # we don't want to clobber any existing accesses, so we have to get
         # info on the dataset before we can add view access
-        source_dataset_resource = self.service.datasets().get(projectId=source_project,
-                                                              datasetId=source_dataset).execute()
-        access = source_dataset_resource['access'] if 'access' in source_dataset_resource
else []
-        view_access = {'view': {'projectId': view_project,
-                                'datasetId': view_dataset,
-                                'tableId': view_table}}
+        source_dataset_resource = self.service.datasets().get(
+            projectId=source_project, datasetId=source_dataset).execute()
+        access = source_dataset_resource[
+            'access'] if 'access' in source_dataset_resource else []
+        view_access = {
+            'view': {
+                'projectId': view_project,
+                'datasetId': view_dataset,
+                'tableId': view_table
+            }
+        }
         # check to see if the view we want to add already exists.
         if view_access not in access:
             self.log.info(
                 'Granting table %s:%s.%s authorized view access to %s:%s dataset.',
-                view_project, view_dataset, view_table, source_project, source_dataset
-            )
+                view_project, view_dataset, view_table, source_project,
+                source_dataset)
             access.append(view_access)
-            return self.service.datasets().patch(projectId=source_project,
-                                                 datasetId=source_dataset,
-                                                 body={'access': access}).execute()
+            return self.service.datasets().patch(
+                projectId=source_project,
+                datasetId=source_dataset,
+                body={
+                    'access': access
+                }).execute()
         else:
             # if view is already in access, do nothing.
             self.log.info(
                 'Table %s:%s.%s already has authorized view access to %s:%s dataset.',
-                view_project, view_dataset, view_table, source_project, source_dataset
-            )
+                view_project, view_dataset, view_table, source_project,
+                source_dataset)
             return source_dataset_resource
 
 
@@ -833,7 +913,8 @@ class BigQueryCursor(BigQueryBaseCursor):
     """
 
     def __init__(self, service, project_id):
-        super(BigQueryCursor, self).__init__(service=service, project_id=project_id)
+        super(BigQueryCursor, self).__init__(
+            service=service, project_id=project_id)
         self.buffersize = None
         self.page_token = None
         self.job_id = None
@@ -863,7 +944,8 @@ class BigQueryCursor(BigQueryBaseCursor):
         :param parameters: Parameters to substitute into the query.
         :type parameters: dict
         """
-        bql = _bind_parameters(operation, parameters) if parameters else operation
+        bql = _bind_parameters(operation,
+                               parameters) if parameters else operation
         self.job_id = self.run_query(bql)
 
     def executemany(self, operation, seq_of_parameters):
@@ -896,14 +978,10 @@ class BigQueryCursor(BigQueryBaseCursor):
             if self.all_pages_loaded:
                 return None
 
-            query_results = (
-                self.service.jobs()
-                .getQueryResults(
-                    projectId=self.project_id,
-                    jobId=self.job_id,
-                    pageToken=self.page_token)
-                .execute()
-            )
+            query_results = (self.service.jobs().getQueryResults(
+                projectId=self.project_id,
+                jobId=self.job_id,
+                pageToken=self.page_token).execute())
 
             if 'rows' in query_results and query_results['rows']:
                 self.page_token = query_results.get('pageToken')
@@ -1038,10 +1116,9 @@ def _split_tablename(table_input, default_project_id, var_name=None):
             return "Format exception for {var}: ".format(var=var_name)
 
     if table_input.count('.') + table_input.count(':') > 3:
-        raise Exception((
-            '{var}Use either : or . to specify project '
-            'got {input}'
-        ).format(var=var_print(var_name), input=table_input))
+        raise Exception(('{var}Use either : or . to specify project '
+                         'got {input}').format(
+                             var=var_print(var_name), input=table_input))
 
     cmpt = table_input.rsplit(':', 1)
     project_id = None
@@ -1054,16 +1131,14 @@ def _split_tablename(table_input, default_project_id, var_name=None):
             project_id = cmpt[0]
             rest = cmpt[1]
     else:
-        raise Exception((
-            '{var}Expect format of (<project:)<dataset>.<table>, '
-            'got {input}'
-        ).format(var=var_print(var_name), input=table_input))
+        raise Exception(('{var}Expect format of (<project:)<dataset>.<table>,
'
+                         'got {input}').format(
+                             var=var_print(var_name), input=table_input))
 
     cmpt = rest.split('.')
     if len(cmpt) == 3:
-        assert project_id is None, (
-            "{var}Use either : or . to specify project"
-        ).format(var=var_print(var_name))
+        assert project_id is None, ("{var}Use either : or . to specify project"
+                                    ).format(var=var_print(var_name))
         project_id = cmpt[0]
         dataset_id = cmpt[1]
         table_id = cmpt[2]
@@ -1072,19 +1147,18 @@ def _split_tablename(table_input, default_project_id, var_name=None):
         dataset_id = cmpt[0]
         table_id = cmpt[1]
     else:
-        raise Exception((
-            '{var}Expect format of (<project.|<project:)<dataset>.<table>,
'
-            'got {input}'
-        ).format(var=var_print(var_name), input=table_input))
+        raise Exception(
+            ('{var}Expect format of (<project.|<project:)<dataset>.<table>,
'
+             'got {input}').format(var=var_print(var_name), input=table_input))
 
     if project_id is None:
         if var_name is not None:
             log = LoggingMixin().log
-            log.info(
-                'Project not included in {var}: {input}; using project "{project}"'.format(
-                    var=var_name, input=table_input, project=default_project_id
-                )
-            )
+            log.info('Project not included in {var}: {input}; '
+                     'using project "{project}"'.format(
+                         var=var_name,
+                         input=table_input,
+                         project=default_project_id))
         project_id = default_project_id
 
     return project_id, dataset_id, table_id

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ba86072/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index 19efa55..4b3ce75 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -46,16 +46,20 @@ class BigQueryOperator(BaseOperator):
     :type udf_config: list
     :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false).
     :type use_legacy_sql: boolean
-    :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic
price.
+    :param maximum_billing_tier: Positive integer that serves as a multiplier
+        of the basic price.
         Defaults to None, in which case it uses the value set in the project.
     :type maximum_billing_tier: integer
-    :param query_params: a dictionary containing query parameter types and values, passed
to
-        BigQuery.
+    :param schema_update_options: Allows the schema of the desitination
+        table to be updated as a side effect of the load job.
+    :type schema_update_options: tuple
+    :param query_params: a dictionary containing query parameter types and
+        values, passed to BigQuery.
     :type query_params: dict
 
     """
     template_fields = ('bql', 'destination_dataset_table')
-    template_ext = ('.sql',)
+    template_ext = ('.sql', )
     ui_color = '#e4f0e8'
 
     @apply_defaults
@@ -70,6 +74,7 @@ class BigQueryOperator(BaseOperator):
                  use_legacy_sql=True,
                  maximum_billing_tier=None,
                  create_disposition='CREATE_IF_NEEDED',
+                 schema_update_options=(),
                  query_params=None,
                  *args,
                  **kwargs):
@@ -84,24 +89,32 @@ class BigQueryOperator(BaseOperator):
         self.udf_config = udf_config
         self.use_legacy_sql = use_legacy_sql
         self.maximum_billing_tier = maximum_billing_tier
+        self.schema_update_options = schema_update_options
         self.query_params = query_params
         self.bq_cursor = None
 
     def execute(self, context):
-        if(self.bq_cursor == None):
+        if self.bq_cursor is None:
             self.log.info('Executing: %s', self.bql)
-            hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
-                                delegate_to=self.delegate_to)
+            hook = BigQueryHook(
+                bigquery_conn_id=self.bigquery_conn_id,
+                delegate_to=self.delegate_to)
             conn = hook.get_conn()
             self.bq_cursor = conn.cursor()
-        self.bq_cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition,
-                         self.allow_large_results, self.udf_config,
-                         self.use_legacy_sql, self.maximum_billing_tier,
-                         self.create_disposition, self.query_params)
-        
-                         
+        self.bq_cursor.run_query(
+            self.bql,
+            destination_dataset_table=self.destination_dataset_table,
+            write_disposition=self.write_disposition,
+            allow_large_results=self.allow_large_results,
+            udf_config=self.udf_config,
+            use_legacy_sql=self.use_legacy_sql,
+            maximum_billing_tier=self.maximum_billing_tier,
+            create_disposition=self.create_disposition,
+            query_params=self.query_params,
+            schema_update_options=self.schema_update_options)
+
     def on_kill(self):
         super(BigQueryOperator, self).on_kill()
-        if(self.bq_cursor!=None):
+        if self.bq_cursor is not None:
             self.log.info('Canceling running query due to execution timeout')
             self.bq_cursor.cancel_query()



Mime
View raw message