Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id A037D200D57 for ; Mon, 11 Dec 2017 23:56:20 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 9EA05160C03; Mon, 11 Dec 2017 22:56:20 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id EACF3160C13 for ; Mon, 11 Dec 2017 23:56:18 +0100 (CET) Received: (qmail 7626 invoked by uid 500); 11 Dec 2017 22:56:18 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 7617 invoked by uid 99); 11 Dec 2017 22:56:18 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd2-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Dec 2017 22:56:18 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd2-us-west.apache.org (ASF Mail Server at spamd2-us-west.apache.org) with ESMTP id 880E11A118A for ; Mon, 11 Dec 2017 22:56:17 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd2-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -4.222 X-Spam-Level: X-Spam-Status: No, score=-4.222 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-0.001, SPF_PASS=-0.001] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd2-us-west.apache.org [10.40.0.9]) (amavisd-new, port 10024) with ESMTP id EfAqFAMXB-Vi for ; Mon, 11 Dec 2017 22:55:42 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 6829362FFC for ; Mon, 11 Dec 2017 22:37:54 +0000 (UTC) Received: (qmail 57625 invoked by uid 99); 11 Dec 2017 22:37:53 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 11 Dec 2017 22:37:53 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 50925DFC11; Mon, 11 Dec 2017 22:37:51 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit From: criccomini@apache.org To: commits@airflow.incubator.apache.org Message-Id: <9854cf884a374407a7b1776930e604f6@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-1829] Support for schema updates in query jobs Date: Mon, 11 Dec 2017 22:37:51 +0000 (UTC) archived-at: Mon, 11 Dec 2017 22:56:20 -0000 Repository: incubator-airflow Updated Branches: refs/heads/master 30076f1e4 -> 8ba86072f [AIRFLOW-1829] Support for schema updates in query jobs Closes #2796 from wileeam/bq-operator-query- schema-update-support Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/8ba86072 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/8ba86072 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/8ba86072 Branch: refs/heads/master Commit: 8ba86072f9c5ef81933cd6546e7e2f000f862053 Parents: 30076f1 Author: Guillermo Rodríguez Cano Authored: Mon Dec 11 14:36:35 2017 -0800 Committer: Chris Riccomini Committed: Mon Dec 11 14:36:46 2017 -0800 ---------------------------------------------------------------------- airflow/contrib/hooks/bigquery_hook.py | 434 ++++++++++++-------- airflow/contrib/operators/bigquery_operator.py | 41 +- 2 files changed, 281 insertions(+), 194 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ba86072/airflow/contrib/hooks/bigquery_hook.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py index 3f9e8af..3b4ce0f 100644 --- a/airflow/contrib/hooks/bigquery_hook.py +++ b/airflow/contrib/hooks/bigquery_hook.py @@ -12,27 +12,28 @@ # See the License for the specific language governing permissions and # limitations under the License. # - """ This module contains a BigQuery Hook, as well as a very basic PEP 249 implementation for BigQuery. """ import time - -from apiclient.discovery import build, HttpError -from googleapiclient import errors from builtins import range -from pandas_gbq.gbq import GbqConnector, \ - _parse_data as gbq_parse_data, \ - _check_google_client_version as gbq_check_google_client_version, \ - _test_google_api_imports as gbq_test_google_api_imports -from pandas.tools.merge import concat + from past.builtins import basestring from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook from airflow.hooks.dbapi_hook import DbApiHook from airflow.utils.log.logging_mixin import LoggingMixin +from apiclient.discovery import HttpError, build +from googleapiclient import errors +from pandas.tools.merge import concat +from pandas_gbq.gbq import \ + _check_google_client_version as gbq_check_google_client_version +from pandas_gbq.gbq import _parse_data as gbq_parse_data +from pandas_gbq.gbq import \ + _test_google_api_imports as gbq_test_google_api_imports +from pandas_gbq.gbq import GbqConnector class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): @@ -42,12 +43,9 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): """ conn_name_attr = 'bigquery_conn_id' - def __init__(self, - bigquery_conn_id='bigquery_default', - delegate_to=None): + def __init__(self, bigquery_conn_id='bigquery_default', delegate_to=None): super(BigQueryHook, self).__init__( - conn_id=bigquery_conn_id, - delegate_to=delegate_to) + conn_id=bigquery_conn_id, delegate_to=delegate_to) def get_conn(self): """ @@ -83,7 +81,8 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): :param bql: The BigQuery SQL to execute. :type bql: string - :param parameters: The parameters to render the SQL query with (not used, leave to override superclass method) + :param parameters: The parameters to render the SQL query with (not + used, leave to override superclass method) :type parameters: mapping or iterable :param dialect: Dialect of BigQuery SQL – legacy SQL or standard SQL :type dialect: string in {'legacy', 'standard'}, default 'legacy' @@ -107,11 +106,12 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): """ Checks for the existence of a table in Google BigQuery. - :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook - must provide access to the specified project. + :param project_id: The Google cloud project in which to look for the + table. The connection supplied to the hook must provide access to + the specified project. :type project_id: string - :param dataset_id: The name of the dataset in which to look for the table. - storage bucket. + :param dataset_id: The name of the dataset in which to look for the + table. :type dataset_id: string :param table_id: The name of the table to check the existence of. :type table_id: string @@ -119,10 +119,8 @@ class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): service = self.get_service() try: service.tables().get( - projectId=project_id, - datasetId=dataset_id, - tableId=table_id - ).execute() + projectId=project_id, datasetId=dataset_id, + tableId=table_id).execute() return True except errors.HttpError as e: if e.resp['status'] == '404': @@ -138,7 +136,13 @@ class BigQueryPandasConnector(GbqConnector): without forcing a three legged OAuth connection. Instead, we can inject service account credentials into the binding. """ - def __init__(self, project_id, service, reauth=False, verbose=False, dialect='legacy'): + + def __init__(self, + project_id, + service, + reauth=False, + verbose=False, + dialect='legacy'): gbq_check_google_client_version() gbq_test_google_api_imports() self.project_id = project_id @@ -182,20 +186,23 @@ class BigQueryBaseCursor(LoggingMixin): BigQuery. The methods can be used directly by operators, in cases where a PEP 249 cursor isn't needed. """ + def __init__(self, service, project_id): self.service = service self.project_id = project_id self.running_job_id = None - def run_query( - self, bql, destination_dataset_table = False, - write_disposition = 'WRITE_EMPTY', - allow_large_results=False, - udf_config = False, - use_legacy_sql=True, - maximum_billing_tier=None, - create_disposition='CREATE_IF_NEEDED', - query_params=None): + def run_query(self, + bql, + destination_dataset_table=False, + write_disposition='WRITE_EMPTY', + allow_large_results=False, + udf_config=False, + use_legacy_sql=True, + maximum_billing_tier=None, + create_disposition='CREATE_IF_NEEDED', + query_params=None, + schema_update_options=()): """ Executes a BigQuery SQL query. Optionally persists results in a BigQuery table. See here: @@ -211,8 +218,6 @@ class BigQueryBaseCursor(LoggingMixin): :param write_disposition: What to do if the table already exists in BigQuery. :type write_disposition: string - :param create_disposition: Specifies whether the job is allowed to create new tables. - :type create_disposition: string :param allow_large_results: Whether to allow large results. :type allow_large_results: boolean :param udf_config: The User Defined Function configuration for the query. @@ -220,9 +225,34 @@ class BigQueryBaseCursor(LoggingMixin): :type udf_config: list :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false). :type use_legacy_sql: boolean - :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price. + :param maximum_billing_tier: Positive integer that serves as a + multiplier of the basic price. :type maximum_billing_tier: integer + :param create_disposition: Specifies whether the job is allowed to + create new tables. + :type create_disposition: string + :param query_params a dictionary containing query parameter types and + values, passed to BigQuery + :type query_params: dict + :param schema_update_options: Allows the schema of the desitination + table to be updated as a side effect of the query job. + :type schema_update_options: tuple """ + + # BigQuery also allows you to define how you want a table's schema to change + # as a side effect of a query job + # for more details: + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.schemaUpdateOptions + allowed_schema_update_options = [ + 'ALLOW_FIELD_ADDITION', "ALLOW_FIELD_RELAXATION" + ] + if not set(allowed_schema_update_options).issuperset( + set(schema_update_options)): + raise ValueError( + "{0} contains invalid schema update options. " + "Please only use one or more of the following options: {1}" + .format(schema_update_options, allowed_schema_update_options)) + configuration = { 'query': { 'query': bql, @@ -239,9 +269,12 @@ class BigQueryBaseCursor(LoggingMixin): _split_tablename(table_input=destination_dataset_table, default_project_id=self.project_id) configuration['query'].update({ - 'allowLargeResults': allow_large_results, - 'writeDisposition': write_disposition, - 'createDisposition': create_disposition, + 'allowLargeResults': + allow_large_results, + 'writeDisposition': + write_disposition, + 'createDisposition': + create_disposition, 'destinationTable': { 'projectId': destination_project, 'datasetId': destination_dataset, @@ -251,17 +284,38 @@ class BigQueryBaseCursor(LoggingMixin): if udf_config: assert isinstance(udf_config, list) configuration['query'].update({ - 'userDefinedFunctionResources': udf_config + 'userDefinedFunctionResources': + udf_config }) if query_params: - configuration['query']['queryParameters'] = query_params + if use_legacy_sql: + raise ValueError("Query paramaters are not allowed when using " + "legacy SQL") + else: + configuration['query']['queryParameters'] = query_params + + if schema_update_options: + if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: + raise ValueError("schema_update_options is only " + "allowed if write_disposition is " + "'WRITE_APPEND' or 'WRITE_TRUNCATE'.") + else: + self.log.info( + "Adding experimental " + "'schemaUpdateOptions': {0}".format(schema_update_options)) + configuration['query'][ + 'schemaUpdateOptions'] = schema_update_options return self.run_with_configuration(configuration) def run_extract( # noqa - self, source_project_dataset_table, destination_cloud_storage_uris, - compression='NONE', export_format='CSV', field_delimiter=',', + self, + source_project_dataset_table, + destination_cloud_storage_uris, + compression='NONE', + export_format='CSV', + field_delimiter=',', print_header=True): """ Executes a BigQuery extract command to copy data from BigQuery to @@ -344,10 +398,10 @@ class BigQueryBaseCursor(LoggingMixin): :param create_disposition: The create disposition if the table doesn't exist. :type create_disposition: string """ - source_project_dataset_tables = ( - [source_project_dataset_tables] - if not isinstance(source_project_dataset_tables, list) - else source_project_dataset_tables) + source_project_dataset_tables = ([ + source_project_dataset_tables + ] if not isinstance(source_project_dataset_tables, list) else + source_project_dataset_tables) source_project_dataset_tables_fixup = [] for source_project_dataset_table in source_project_dataset_tables: @@ -356,9 +410,12 @@ class BigQueryBaseCursor(LoggingMixin): default_project_id=self.project_id, var_name='source_project_dataset_table') source_project_dataset_tables_fixup.append({ - 'projectId': source_project, - 'datasetId': source_dataset, - 'tableId': source_table + 'projectId': + source_project, + 'datasetId': + source_dataset, + 'tableId': + source_table }) destination_project, destination_dataset, destination_table = \ @@ -381,7 +438,8 @@ class BigQueryBaseCursor(LoggingMixin): def run_load(self, destination_project_dataset_table, - schema_fields, source_uris, + schema_fields, + source_uris, source_format='CSV', create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, @@ -437,7 +495,7 @@ class BigQueryBaseCursor(LoggingMixin): :type allow_jagged_rows: bool :param schema_update_options: Allows the schema of the desitination table to be updated as a side effect of the load job. - :type schema_update_options: list + :type schema_update_options: tuple :param src_fmt_configs: configure optional fields specific to the source format :type src_fmt_configs: dict """ @@ -448,26 +506,28 @@ class BigQueryBaseCursor(LoggingMixin): # Refer to this link for more details: # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat source_format = source_format.upper() - allowed_formats = ["CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS", "DATASTORE_BACKUP"] + allowed_formats = [ + "CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS", + "DATASTORE_BACKUP" + ] if source_format not in allowed_formats: raise ValueError("{0} is not a valid source format. " - "Please use one of the following types: {1}" - .format(source_format, allowed_formats)) + "Please use one of the following types: {1}" + .format(source_format, allowed_formats)) # bigquery also allows you to define how you want a table's schema to change # as a side effect of a load # for more details: # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions allowed_schema_update_options = [ - 'ALLOW_FIELD_ADDITION', - "ALLOW_FIELD_RELAXATION" + 'ALLOW_FIELD_ADDITION', "ALLOW_FIELD_RELAXATION" ] - if not set(allowed_schema_update_options).issuperset(set(schema_update_options)): + if not set(allowed_schema_update_options).issuperset( + set(schema_update_options)): raise ValueError( "{0} contains invalid schema update options. " "Please only use one or more of the following options: {1}" - .format(schema_update_options, allowed_schema_update_options) - ) + .format(schema_update_options, allowed_schema_update_options)) destination_project, destination_dataset, destination_table = \ _split_tablename(table_input=destination_project_dataset_table, @@ -488,23 +548,19 @@ class BigQueryBaseCursor(LoggingMixin): } } if schema_fields: - configuration['load']['schema'] = { - 'fields': schema_fields - } + configuration['load']['schema'] = {'fields': schema_fields} if schema_update_options: if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: - raise ValueError( - "schema_update_options is only " - "allowed if write_disposition is " - "'WRITE_APPEND' or 'WRITE_TRUNCATE'." - ) + raise ValueError("schema_update_options is only " + "allowed if write_disposition is " + "'WRITE_APPEND' or 'WRITE_TRUNCATE'.") else: self.log.info( "Adding experimental " - "'schemaUpdateOptions': {0}".format(schema_update_options) - ) - configuration['load']['schemaUpdateOptions'] = schema_update_options + "'schemaUpdateOptions': {0}".format(schema_update_options)) + configuration['load'][ + 'schemaUpdateOptions'] = schema_update_options if max_bad_records: configuration['load']['maxBadRecords'] = max_bad_records @@ -521,16 +577,20 @@ class BigQueryBaseCursor(LoggingMixin): src_fmt_configs['allowQuotedNewlines'] = allow_quoted_newlines src_fmt_to_configs_mapping = { - 'CSV': ['allowJaggedRows', 'allowQuotedNewlines', 'autodetect', - 'fieldDelimiter', 'skipLeadingRows', 'ignoreUnknownValues', - 'nullMarker', 'quote'], + 'CSV': [ + 'allowJaggedRows', 'allowQuotedNewlines', 'autodetect', + 'fieldDelimiter', 'skipLeadingRows', 'ignoreUnknownValues', + 'nullMarker', 'quote' + ], 'DATASTORE_BACKUP': ['projectionFields'], 'NEWLINE_DELIMITED_JSON': ['autodetect', 'ignoreUnknownValues'], 'AVRO': [], } valid_configs = src_fmt_to_configs_mapping[source_format] - src_fmt_configs = {k: v for k, v in src_fmt_configs.items() - if k in valid_configs} + src_fmt_configs = { + k: v + for k, v in src_fmt_configs.items() if k in valid_configs + } configuration['load'].update(src_fmt_configs) if allow_jagged_rows: @@ -552,9 +612,7 @@ class BigQueryBaseCursor(LoggingMixin): details. """ jobs = self.service.jobs() - job_data = { - 'configuration': configuration - } + job_data = {'configuration': configuration} # Send query and wait for reply. query_reply = jobs \ @@ -566,30 +624,34 @@ class BigQueryBaseCursor(LoggingMixin): keep_polling_job = True while (keep_polling_job): try: - job = jobs.get(projectId=self.project_id, jobId=self.running_job_id).execute() + job = jobs.get( + projectId=self.project_id, + jobId=self.running_job_id).execute() if (job['status']['state'] == 'DONE'): keep_polling_job = False # Check if job had errors. if 'errorResult' in job['status']: raise Exception( - 'BigQuery job failed. Final error was: {}. The job was: {}'.format( - job['status']['errorResult'], job - ) - ) + 'BigQuery job failed. Final error was: {}. The job was: {}'. + format(job['status']['errorResult'], job)) else: - self.log.info('Waiting for job to complete : %s, %s', self.project_id, self.running_job_id) + self.log.info('Waiting for job to complete : %s, %s', + self.project_id, self.running_job_id) time.sleep(5) except HttpError as err: if err.resp.status in [500, 503]: - self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, self.running_job_id) + self.log.info( + '%s: Retryable error, waiting for job to complete: %s', + err.resp.status, self.running_job_id) time.sleep(5) else: raise Exception( - 'BigQuery job status check failed. Final error was: %s', err.resp.status) + 'BigQuery job status check failed. Final error was: %s', + err.resp.status) return self.running_job_id - + def poll_job_complete(self, job_id): jobs = self.service.jobs() try: @@ -598,39 +660,50 @@ class BigQueryBaseCursor(LoggingMixin): return True except HttpError as err: if err.resp.status in [500, 503]: - self.log.info('%s: Retryable error while polling job with id %s', err.resp.status, job_id) + self.log.info( + '%s: Retryable error while polling job with id %s', + err.resp.status, job_id) else: raise Exception( - 'BigQuery job status check failed. Final error was: %s', err.resp.status) + 'BigQuery job status check failed. Final error was: %s', + err.resp.status) return False - - + def cancel_query(self): """ Cancel all started queries that have not yet completed """ jobs = self.service.jobs() - if (self.running_job_id and not self.poll_job_complete(self.running_job_id)): - self.log.info('Attempting to cancel job : %s, %s', self.project_id, self.running_job_id) - jobs.cancel(projectId=self.project_id, jobId=self.running_job_id).execute() + if (self.running_job_id and + not self.poll_job_complete(self.running_job_id)): + self.log.info('Attempting to cancel job : %s, %s', self.project_id, + self.running_job_id) + jobs.cancel( + projectId=self.project_id, + jobId=self.running_job_id).execute() else: self.log.info('No running BigQuery jobs to cancel.') return - + # Wait for all the calls to cancel to finish max_polling_attempts = 12 polling_attempts = 0 - + job_complete = False while (polling_attempts < max_polling_attempts and not job_complete): - polling_attempts = polling_attempts+1 + polling_attempts = polling_attempts + 1 job_complete = self.poll_job_complete(self.running_job_id) if (job_complete): - self.log.info('Job successfully canceled: %s, %s', self.project_id, self.running_job_id) - elif(polling_attempts == max_polling_attempts): - self.log.info('Stopping polling due to timeout. Job with id %s has not completed cancel and may or may not finish.', self.running_job_id) + self.log.info('Job successfully canceled: %s, %s', + self.project_id, self.running_job_id) + elif (polling_attempts == max_polling_attempts): + self.log.info( + "Stopping polling due to timeout. Job with id %s " + "has not completed cancel and may or may not finish.", + self.running_job_id) else: - self.log.info('Waiting for canceled job with id %s to finish.', self.running_job_id) + self.log.info('Waiting for canceled job with id %s to finish.', + self.running_job_id) time.sleep(5) def get_schema(self, dataset_id, table_id): @@ -647,8 +720,12 @@ class BigQueryBaseCursor(LoggingMixin): .execute() return tables_resource['schema'] - def get_tabledata(self, dataset_id, table_id, - max_results=None, page_token=None, start_index=None): + def get_tabledata(self, + dataset_id, + table_id, + max_results=None, + page_token=None, + start_index=None): """ Get the data of a given dataset.table. see https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list @@ -668,15 +745,14 @@ class BigQueryBaseCursor(LoggingMixin): optional_params['pageToken'] = page_token if start_index: optional_params['startIndex'] = start_index - return ( - self.service.tabledata() - .list( - projectId=self.project_id, datasetId=dataset_id, - tableId=table_id, **optional_params) - .execute() - ) - - def run_table_delete(self, deletion_dataset_table, ignore_if_missing=False): + return (self.service.tabledata().list( + projectId=self.project_id, + datasetId=dataset_id, + tableId=table_id, + **optional_params).execute()) + + def run_table_delete(self, deletion_dataset_table, + ignore_if_missing=False): """ Delete an existing table from the dataset; If the table does not exist, return an error unless ignore_if_missing @@ -705,16 +781,14 @@ class BigQueryBaseCursor(LoggingMixin): datasetId=deletion_dataset, tableId=deletion_table) \ .execute() - self.log.info('Deleted table %s:%s.%s.', - deletion_project, deletion_dataset, deletion_table) + self.log.info('Deleted table %s:%s.%s.', deletion_project, + deletion_dataset, deletion_table) except HttpError: if not ignore_if_missing: - raise Exception( - 'Table deletion failed. Table does not exist.') + raise Exception('Table deletion failed. Table does not exist.') else: self.log.info('Table does not exist. Skipping.') - def run_table_upsert(self, dataset_id, table_resource, project_id=None): """ creates a new, empty table in the dataset; @@ -734,20 +808,19 @@ class BigQueryBaseCursor(LoggingMixin): # check to see if the table exists table_id = table_resource['tableReference']['tableId'] project_id = project_id if project_id is not None else self.project_id - tables_list_resp = self.service.tables().list(projectId=project_id, - datasetId=dataset_id).execute() + tables_list_resp = self.service.tables().list( + projectId=project_id, datasetId=dataset_id).execute() while True: for table in tables_list_resp.get('tables', []): if table['tableReference']['tableId'] == table_id: # found the table, do update - self.log.info( - 'Table %s:%s.%s exists, updating.', - project_id, dataset_id, table_id - ) - return self.service.tables().update(projectId=project_id, - datasetId=dataset_id, - tableId=table_id, - body=table_resource).execute() + self.log.info('Table %s:%s.%s exists, updating.', + project_id, dataset_id, table_id) + return self.service.tables().update( + projectId=project_id, + datasetId=dataset_id, + tableId=table_id, + body=table_resource).execute() # If there is a next page, we need to check the next page. if 'nextPageToken' in tables_list_resp: tables_list_resp = self.service.tables()\ @@ -758,20 +831,19 @@ class BigQueryBaseCursor(LoggingMixin): # If there is no next page, then the table doesn't exist. else: # do insert - self.log.info( - 'Table %s:%s.%s does not exist. creating.', - project_id, dataset_id, table_id - ) - return self.service.tables().insert(projectId=project_id, - datasetId=dataset_id, - body=table_resource).execute() + self.log.info('Table %s:%s.%s does not exist. creating.', + project_id, dataset_id, table_id) + return self.service.tables().insert( + projectId=project_id, + datasetId=dataset_id, + body=table_resource).execute() def run_grant_dataset_view_access(self, source_dataset, view_dataset, view_table, - source_project = None, - view_project = None): + source_project=None, + view_project=None): """ Grant authorized view access of a dataset to a view table. If this view has already been granted access to the dataset, do nothing. @@ -798,28 +870,36 @@ class BigQueryBaseCursor(LoggingMixin): # we don't want to clobber any existing accesses, so we have to get # info on the dataset before we can add view access - source_dataset_resource = self.service.datasets().get(projectId=source_project, - datasetId=source_dataset).execute() - access = source_dataset_resource['access'] if 'access' in source_dataset_resource else [] - view_access = {'view': {'projectId': view_project, - 'datasetId': view_dataset, - 'tableId': view_table}} + source_dataset_resource = self.service.datasets().get( + projectId=source_project, datasetId=source_dataset).execute() + access = source_dataset_resource[ + 'access'] if 'access' in source_dataset_resource else [] + view_access = { + 'view': { + 'projectId': view_project, + 'datasetId': view_dataset, + 'tableId': view_table + } + } # check to see if the view we want to add already exists. if view_access not in access: self.log.info( 'Granting table %s:%s.%s authorized view access to %s:%s dataset.', - view_project, view_dataset, view_table, source_project, source_dataset - ) + view_project, view_dataset, view_table, source_project, + source_dataset) access.append(view_access) - return self.service.datasets().patch(projectId=source_project, - datasetId=source_dataset, - body={'access': access}).execute() + return self.service.datasets().patch( + projectId=source_project, + datasetId=source_dataset, + body={ + 'access': access + }).execute() else: # if view is already in access, do nothing. self.log.info( 'Table %s:%s.%s already has authorized view access to %s:%s dataset.', - view_project, view_dataset, view_table, source_project, source_dataset - ) + view_project, view_dataset, view_table, source_project, + source_dataset) return source_dataset_resource @@ -833,7 +913,8 @@ class BigQueryCursor(BigQueryBaseCursor): """ def __init__(self, service, project_id): - super(BigQueryCursor, self).__init__(service=service, project_id=project_id) + super(BigQueryCursor, self).__init__( + service=service, project_id=project_id) self.buffersize = None self.page_token = None self.job_id = None @@ -863,7 +944,8 @@ class BigQueryCursor(BigQueryBaseCursor): :param parameters: Parameters to substitute into the query. :type parameters: dict """ - bql = _bind_parameters(operation, parameters) if parameters else operation + bql = _bind_parameters(operation, + parameters) if parameters else operation self.job_id = self.run_query(bql) def executemany(self, operation, seq_of_parameters): @@ -896,14 +978,10 @@ class BigQueryCursor(BigQueryBaseCursor): if self.all_pages_loaded: return None - query_results = ( - self.service.jobs() - .getQueryResults( - projectId=self.project_id, - jobId=self.job_id, - pageToken=self.page_token) - .execute() - ) + query_results = (self.service.jobs().getQueryResults( + projectId=self.project_id, + jobId=self.job_id, + pageToken=self.page_token).execute()) if 'rows' in query_results and query_results['rows']: self.page_token = query_results.get('pageToken') @@ -1038,10 +1116,9 @@ def _split_tablename(table_input, default_project_id, var_name=None): return "Format exception for {var}: ".format(var=var_name) if table_input.count('.') + table_input.count(':') > 3: - raise Exception(( - '{var}Use either : or . to specify project ' - 'got {input}' - ).format(var=var_print(var_name), input=table_input)) + raise Exception(('{var}Use either : or . to specify project ' + 'got {input}').format( + var=var_print(var_name), input=table_input)) cmpt = table_input.rsplit(':', 1) project_id = None @@ -1054,16 +1131,14 @@ def _split_tablename(table_input, default_project_id, var_name=None): project_id = cmpt[0] rest = cmpt[1] else: - raise Exception(( - '{var}Expect format of (., ' - 'got {input}' - ).format(var=var_print(var_name), input=table_input)) + raise Exception(('{var}Expect format of (.
, ' + 'got {input}').format( + var=var_print(var_name), input=table_input)) cmpt = rest.split('.') if len(cmpt) == 3: - assert project_id is None, ( - "{var}Use either : or . to specify project" - ).format(var=var_print(var_name)) + assert project_id is None, ("{var}Use either : or . to specify project" + ).format(var=var_print(var_name)) project_id = cmpt[0] dataset_id = cmpt[1] table_id = cmpt[2] @@ -1072,19 +1147,18 @@ def _split_tablename(table_input, default_project_id, var_name=None): dataset_id = cmpt[0] table_id = cmpt[1] else: - raise Exception(( - '{var}Expect format of (.
, ' - 'got {input}' - ).format(var=var_print(var_name), input=table_input)) + raise Exception( + ('{var}Expect format of (.
, ' + 'got {input}').format(var=var_print(var_name), input=table_input)) if project_id is None: if var_name is not None: log = LoggingMixin().log - log.info( - 'Project not included in {var}: {input}; using project "{project}"'.format( - var=var_name, input=table_input, project=default_project_id - ) - ) + log.info('Project not included in {var}: {input}; ' + 'using project "{project}"'.format( + var=var_name, + input=table_input, + project=default_project_id)) project_id = default_project_id return project_id, dataset_id, table_id http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/8ba86072/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index 19efa55..4b3ce75 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -46,16 +46,20 @@ class BigQueryOperator(BaseOperator): :type udf_config: list :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false). :type use_legacy_sql: boolean - :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price. + :param maximum_billing_tier: Positive integer that serves as a multiplier + of the basic price. Defaults to None, in which case it uses the value set in the project. :type maximum_billing_tier: integer - :param query_params: a dictionary containing query parameter types and values, passed to - BigQuery. + :param schema_update_options: Allows the schema of the desitination + table to be updated as a side effect of the load job. + :type schema_update_options: tuple + :param query_params: a dictionary containing query parameter types and + values, passed to BigQuery. :type query_params: dict """ template_fields = ('bql', 'destination_dataset_table') - template_ext = ('.sql',) + template_ext = ('.sql', ) ui_color = '#e4f0e8' @apply_defaults @@ -70,6 +74,7 @@ class BigQueryOperator(BaseOperator): use_legacy_sql=True, maximum_billing_tier=None, create_disposition='CREATE_IF_NEEDED', + schema_update_options=(), query_params=None, *args, **kwargs): @@ -84,24 +89,32 @@ class BigQueryOperator(BaseOperator): self.udf_config = udf_config self.use_legacy_sql = use_legacy_sql self.maximum_billing_tier = maximum_billing_tier + self.schema_update_options = schema_update_options self.query_params = query_params self.bq_cursor = None def execute(self, context): - if(self.bq_cursor == None): + if self.bq_cursor is None: self.log.info('Executing: %s', self.bql) - hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, - delegate_to=self.delegate_to) + hook = BigQueryHook( + bigquery_conn_id=self.bigquery_conn_id, + delegate_to=self.delegate_to) conn = hook.get_conn() self.bq_cursor = conn.cursor() - self.bq_cursor.run_query(self.bql, self.destination_dataset_table, self.write_disposition, - self.allow_large_results, self.udf_config, - self.use_legacy_sql, self.maximum_billing_tier, - self.create_disposition, self.query_params) - - + self.bq_cursor.run_query( + self.bql, + destination_dataset_table=self.destination_dataset_table, + write_disposition=self.write_disposition, + allow_large_results=self.allow_large_results, + udf_config=self.udf_config, + use_legacy_sql=self.use_legacy_sql, + maximum_billing_tier=self.maximum_billing_tier, + create_disposition=self.create_disposition, + query_params=self.query_params, + schema_update_options=self.schema_update_options) + def on_kill(self): super(BigQueryOperator, self).on_kill() - if(self.bq_cursor!=None): + if self.bq_cursor is not None: self.log.info('Canceling running query due to execution timeout') self.bq_cursor.cancel_query()