@@ -157,25 +187,24 @@ implementation for BigQuery. """ -from builtins import range -from past.builtins import basestring - -import logging import time -from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook -from airflow.hooks.dbapi_hook import DbApiHook -from apiclient.discovery import build -from pandas.io.gbq import GbqConnector, \ +from apiclient.discovery import build, HttpError +from googleapiclient import errors +from builtins import range +from pandas_gbq.gbq import GbqConnector, \ _parse_data as gbq_parse_data, \ _check_google_client_version as gbq_check_google_client_version, \ _test_google_api_imports as gbq_test_google_api_imports -from pandas.tools.merge import concat +from pandas.tools.merge import concat +from past.builtins import basestring -logging.getLogger("bigquery").setLevel(logging.INFO) +from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook +from airflow.hooks.dbapi_hook import DbApiHook +from airflow.utils.log.logging_mixin import LoggingMixin -
[docs]class BigQueryHook(GoogleCloudBaseHook, DbApiHook): +
[docs]class BigQueryHook(GoogleCloudBaseHook, DbApiHook, LoggingMixin): """ Interact with BigQuery. This hook uses the Google Cloud Platform connection. @@ -184,8 +213,8 @@ def __init__(self, bigquery_conn_id='bigquery_default', - delegate_to=None): - super(BigQueryHook, self).__init__( + delegate_to=None): + super(BigQueryHook, self).__init__( conn_id=bigquery_conn_id, delegate_to=delegate_to) @@ -204,7 +233,7 @@ http_authorized = self._authorize() return build('bigquery', 'v2', http=http_authorized)
-
[docs] def insert_rows(self, table, rows, target_fields=None, commit_every=1000): +
[docs] def insert_rows(self, table, rows, target_fields=None, commit_every=1000): """ Insertion is currently unsupported. Theoretically, you could use BigQuery's streaming API to insert rows into a table, but this hasn't @@ -212,7 +241,7 @@ """ raise NotImplementedError()
-
[docs] def get_pandas_df(self, bql, parameters=None): +
[docs] def get_pandas_df(self, bql, parameters=None, dialect='legacy'): """ Returns a Pandas DataFrame for the results produced by a BigQuery query. The DbApiHook method must be overridden because Pandas @@ -223,10 +252,14 @@ :param bql: The BigQuery SQL to execute. :type bql: string + :param parameters: The parameters to render the SQL query with (not used, leave to override superclass method) + :type parameters: mapping or iterable + :param dialect: Dialect of BigQuery SQL – legacy SQL or standard SQL + :type dialect: string in {'legacy', 'standard'}, default 'legacy' """ service = self.get_service() project = self._get_field('project') - connector = BigQueryPandasConnector(project, service) + connector = BigQueryPandasConnector(project, service, dialect=dialect) schema, pages = connector.run_query(bql) dataframe_list = [] @@ -235,9 +268,35 @@ dataframe_list.append(gbq_parse_data(schema, page)) if len(dataframe_list) > 0: - return concat(dataframe_list, ignore_index=True) + return concat(dataframe_list, ignore_index=True) else: - return gbq_parse_data(schema, [])
+ return gbq_parse_data(schema, [])
+ +
[docs] def table_exists(self, project_id, dataset_id, table_id): + """ + Checks for the existence of a table in Google BigQuery. + + :param project_id: The Google cloud project in which to look for the table. The connection supplied to the hook + must provide access to the specified project. + :type project_id: string + :param dataset_id: The name of the dataset in which to look for the table. + storage bucket. + :type dataset_id: string + :param table_id: The name of the table to check the existence of. + :type table_id: string + """ + service = self.get_service() + try: + service.tables().get( + projectId=project_id, + datasetId=dataset_id, + tableId=table_id + ).execute() + return True + except errors.HttpError as e: + if e.resp['status'] == '404': + return False + raise
class BigQueryPandasConnector(GbqConnector): @@ -248,13 +307,14 @@ without forcing a three legged OAuth connection. Instead, we can inject service account credentials into the binding. """ - def __init__(self, project_id, service, reauth=False, verbose=False): + def __init__(self, project_id, service, reauth=False, verbose=False, dialect='legacy'): gbq_check_google_client_version() gbq_test_google_api_imports() self.project_id = project_id self.reauth = reauth self.service = service self.verbose = verbose + self.dialect = dialect class BigQueryConnection(object): @@ -285,22 +345,25 @@ "BigQueryConnection does not have transactions") -class BigQueryBaseCursor(object): +class BigQueryBaseCursor(LoggingMixin): """ The BigQuery base cursor contains helper methods to execute queries against BigQuery. The methods can be used directly by operators, in cases where a PEP 249 cursor isn't needed. """ - def __init__(self, service, project_id): self.service = service self.project_id = project_id def run_query( - self, bql, destination_dataset_table = False, + self, bql, destination_dataset_table = False, write_disposition = 'WRITE_EMPTY', - allow_large_results=False, - udf_config = False): + allow_large_results=False, + udf_config = False, + use_legacy_sql=True, + maximum_billing_tier=None, + create_disposition='CREATE_IF_NEEDED', + query_params=None): """ Executes a BigQuery SQL query. Optionally persists results in a BigQuery table. See here: @@ -315,29 +378,40 @@ BigQuery table to save the query results. :param write_disposition: What to do if the table already exists in BigQuery. + :type write_disposition: string + :param create_disposition: Specifies whether the job is allowed to create new tables. + :type create_disposition: string :param allow_large_results: Whether to allow large results. :type allow_large_results: boolean :param udf_config: The User Defined Function configuration for the query. See https://cloud.google.com/bigquery/user-defined-functions for details. :type udf_config: list + :param use_legacy_sql: Whether to use legacy SQL (true) or standard SQL (false). + :type use_legacy_sql: boolean + :param maximum_billing_tier: Positive integer that serves as a multiplier of the basic price. + :type maximum_billing_tier: integer """ configuration = { 'query': { 'query': bql, + 'useLegacySql': use_legacy_sql, + 'maximumBillingTier': maximum_billing_tier } } if destination_dataset_table: assert '.' in destination_dataset_table, ( 'Expected destination_dataset_table in the format of ' - '<dataset>.<table>. Got: {}').format(destination_dataset_table) - destination_dataset, destination_table = \ - destination_dataset_table.split('.', 1) + '<dataset>.<table>. Got: {}').format(destination_dataset_table) + destination_project, destination_dataset, destination_table = \ + _split_tablename(table_input=destination_dataset_table, + default_project_id=self.project_id) configuration['query'].update({ 'allowLargeResults': allow_large_results, 'writeDisposition': write_disposition, + 'createDisposition': create_disposition, 'destinationTable': { - 'projectId': self.project_id, + 'projectId': destination_project, 'datasetId': destination_dataset, 'tableId': destination_table, } @@ -348,12 +422,15 @@ 'userDefinedFunctionResources': udf_config }) + if query_params: + configuration['query']['queryParameters'] = query_params + return self.run_with_configuration(configuration) def run_extract( # noqa self, source_project_dataset_table, destination_cloud_storage_uris, compression='NONE', export_format='CSV', field_delimiter=',', - print_header=True): + print_header=True): """ Executes a BigQuery extract command to copy data from BigQuery to Google Cloud Storage. See here: @@ -379,9 +456,12 @@ :param print_header: Whether to print a header for a CSV file extract. :type print_header: boolean """ + source_project, source_dataset, source_table = \ - self._split_project_dataset_table_input( - 'source_project_dataset_table', source_project_dataset_table) + _split_tablename(table_input=source_project_dataset_table, + default_project_id=self.project_id, + var_name='source_project_dataset_table') + configuration = { 'extract': { 'sourceTable': { @@ -418,14 +498,14 @@ For more details about these parameters. :param source_project_dataset_tables: One or more dotted - (<project>.)<dataset>.<table> + (project:|project.)<dataset>.<table> BigQuery tables to use as the source data. Use a list if there are multiple source tables. If <project> is not included, project will be the project defined in the connection json. :type source_project_dataset_tables: list|string :param destination_project_dataset_table: The destination BigQuery - table. Format is: <project>.<dataset>.<table> + table. Format is: (project:|project.)<dataset>.<table> :type destination_project_dataset_table: string :param write_disposition: The write disposition if the table already exists. :type write_disposition: string @@ -440,21 +520,18 @@ source_project_dataset_tables_fixup = [] for source_project_dataset_table in source_project_dataset_tables: source_project, source_dataset, source_table = \ - self._split_project_dataset_table_input( - 'source_project_dataset_table', source_project_dataset_table) + _split_tablename(table_input=source_project_dataset_table, + default_project_id=self.project_id, + var_name='source_project_dataset_table') source_project_dataset_tables_fixup.append({ 'projectId': source_project, 'datasetId': source_dataset, 'tableId': source_table }) - assert 3 == len(destination_project_dataset_table.split('.')), ( - 'Expected destination_project_dataset_table in the format of ' - '<project>.<dataset>.<table>. ' - 'Got: {}').format(destination_project_dataset_table) - destination_project, destination_dataset, destination_table = \ - destination_project_dataset_table.split('.', 2) + _split_tablename(table_input=destination_project_dataset_table, + default_project_id=self.project_id) configuration = { 'copy': { 'createDisposition': create_disposition, @@ -477,7 +554,13 @@ create_disposition='CREATE_IF_NEEDED', skip_leading_rows=0, write_disposition='WRITE_EMPTY', - field_delimiter=','): + field_delimiter=',', + max_bad_records=0, + quote_character=None, + allow_quoted_newlines=False, + allow_jagged_rows=False, + schema_update_options=(), + src_fmt_configs={}): """ Executes a BigQuery load command to load data from Google Cloud Storage to BigQuery. See here: @@ -487,9 +570,9 @@ For more details about these parameters. :param destination_project_dataset_table: - The dotted (<project>.)<dataset>.<table> BigQuery table to load data into. - If <project> is not included, project will be the project defined in - the connection json. + The dotted (<project>.|<project>:)<dataset>.<table> BigQuery table to load + data into. If <project> is not included, project will be the project defined + in the connection json. :type destination_project_dataset_table: string :param schema_fields: The schema field list as defined here: https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load @@ -508,10 +591,56 @@ :type write_disposition: string :param field_delimiter: The delimiter to use when loading from a CSV. :type field_delimiter: string + :param max_bad_records: The maximum number of bad records that BigQuery can + ignore when running the job. + :type max_bad_records: int + :param quote_character: The value that is used to quote data sections in a CSV file. + :type quote_character: string + :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false). + :type allow_quoted_newlines: boolean + :param allow_jagged_rows: Accept rows that are missing trailing optional columns. + The missing values are treated as nulls. If false, records with missing trailing columns + are treated as bad records, and if there are too many bad records, an invalid error is + returned in the job result. Only applicable when soure_format is CSV. + :type allow_jagged_rows: bool + :param schema_update_options: Allows the schema of the desitination + table to be updated as a side effect of the load job. + :type schema_update_options: list + :param src_fmt_configs: configure optional fields specific to the source format + :type src_fmt_configs: dict """ + + # bigquery only allows certain source formats + # we check to make sure the passed source format is valid + # if it's not, we raise a ValueError + # Refer to this link for more details: + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.query.tableDefinitions.(key).sourceFormat + source_format = source_format.upper() + allowed_formats = ["CSV", "NEWLINE_DELIMITED_JSON", "AVRO", "GOOGLE_SHEETS", "DATASTORE_BACKUP"] + if source_format not in allowed_formats: + raise ValueError("{0} is not a valid source format. " + "Please use one of the following types: {1}" + .format(source_format, allowed_formats)) + + # bigquery also allows you to define how you want a table's schema to change + # as a side effect of a load + # for more details: + # https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schemaUpdateOptions + allowed_schema_update_options = [ + 'ALLOW_FIELD_ADDITION', + "ALLOW_FIELD_RELAXATION" + ] + if not set(allowed_schema_update_options).issuperset(set(schema_update_options)): + raise ValueError( + "{0} contains invalid schema update options. " + "Please only use one or more of the following options: {1}" + .format(schema_update_options, allowed_schema_update_options) + ) + destination_project, destination_dataset, destination_table = \ - self._split_project_dataset_table_input( - 'destination_project_dataset_table', destination_project_dataset_table) + _split_tablename(table_input=destination_project_dataset_table, + default_project_id=self.project_id, + var_name='destination_project_dataset_table') configuration = { 'load': { @@ -521,43 +650,62 @@ 'datasetId': destination_dataset, 'tableId': destination_table, }, - 'schema': { - 'fields': schema_fields - }, 'sourceFormat': source_format, 'sourceUris': source_uris, 'writeDisposition': write_disposition, } } + if schema_fields: + configuration['load']['schema'] = { + 'fields': schema_fields + } + + if schema_update_options: + if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]: + raise ValueError( + "schema_update_options is only " + "allowed if write_disposition is " + "'WRITE_APPEND' or 'WRITE_TRUNCATE'." + ) + else: + self.log.info( + "Adding experimental " + "'schemaUpdateOptions': {0}".format(schema_update_options) + ) + configuration['load']['schemaUpdateOptions'] = schema_update_options + + if max_bad_records: + configuration['load']['maxBadRecords'] = max_bad_records + + # if following fields are not specified in src_fmt_configs, + # honor the top-level params for backward-compatibility + if 'skipLeadingRows' not in src_fmt_configs: + src_fmt_configs['skipLeadingRows'] = skip_leading_rows + if 'fieldDelimiter' not in src_fmt_configs: + src_fmt_configs['fieldDelimiter'] = field_delimiter + if quote_character: + src_fmt_configs['quote'] = quote_character + if allow_quoted_newlines: + src_fmt_configs['allowQuotedNewlines'] = allow_quoted_newlines + + src_fmt_to_configs_mapping = { + 'CSV': ['allowJaggedRows', 'allowQuotedNewlines', 'autodetect', + 'fieldDelimiter', 'skipLeadingRows', 'ignoreUnknownValues', + 'nullMarker', 'quote'], + 'DATASTORE_BACKUP': ['projectionFields'], + 'NEWLINE_DELIMITED_JSON': ['autodetect', 'ignoreUnknownValues'], + 'AVRO': [], + } + valid_configs = src_fmt_to_configs_mapping[source_format] + src_fmt_configs = {k: v for k, v in src_fmt_configs.items() + if k in valid_configs} + configuration['load'].update(src_fmt_configs) - if source_format == 'CSV': - configuration['load']['skipLeadingRows'] = skip_leading_rows - configuration['load']['fieldDelimiter'] = field_delimiter + if allow_jagged_rows: + configuration['load']['allowJaggedRows'] = allow_jagged_rows return self.run_with_configuration(configuration) - def _split_project_dataset_table_input(self, var_name, project_dataset_table): - """ - :param var_name: the name of the variable input, for logging and erroring purposes. - :type var_name: str - :param project_dataset_table: input string in (<project>.)<dataset>.<project> format. - if project is not included in the string, self.project_id will be returned in the tuple. - :type project_dataset_table: str - :return: (project, dataset, table) tuple - """ - table_split = project_dataset_table.split('.') - assert len(table_split) == 2 or len(table_split) == 3, ( - 'Expected {var} in the format of (<project.)<dataset>.<table>, ' - 'got {input}').format(var=var_name, input=project_dataset_table) - - if len(table_split) == 2: - logging.info('project not included in {var}: {input}; using project "{project}"'.format(var=var_name, input=project_dataset_table, project=self.project_id)) - dataset, table = table_split - return self.project_id, dataset, table - else: - project, dataset, table = table_split - return project, dataset, table - def run_with_configuration(self, configuration): """ Executes a BigQuery SQL query. See here: @@ -581,18 +729,32 @@ .insert(projectId=self.project_id, body=job_data) \ .execute() job_id = query_reply['jobReference']['jobId'] - job = jobs.get(projectId=self.project_id, jobId=job_id).execute() # Wait for query to finish. - while not job['status']['state'] == 'DONE': - logging.info('Waiting for job to complete: %s, %s', self.project_id, job_id) - time.sleep(5) - job = jobs.get(projectId=self.project_id, jobId=job_id).execute() - - # Check if job had errors. - if 'errorResult' in job['status']: - raise Exception( - 'BigQuery job failed. Final error was: %s', job['status']['errorResult']) + keep_polling_job = True + while (keep_polling_job): + try: + job = jobs.get(projectId=self.project_id, jobId=job_id).execute() + if (job['status']['state'] == 'DONE'): + keep_polling_job = False + # Check if job had errors. + if 'errorResult' in job['status']: + raise Exception( + 'BigQuery job failed. Final error was: {}. The job was: {}'.format( + job['status']['errorResult'], job + ) + ) + else: + self.log.info('Waiting for job to complete : %s, %s', self.project_id, job_id) + time.sleep(5) + + except HttpError as err: + if err.resp.status in [500, 503]: + self.log.info('%s: Retryable error, waiting for job to complete: %s', err.resp.status, job_id) + time.sleep(5) + else: + raise Exception( + 'BigQuery job status check failed. Final error was: %s', err.resp.status) return job_id @@ -611,7 +773,7 @@ return tables_resource['schema'] def get_tabledata(self, dataset_id, table_id, - max_results=None, page_token=None, start_index=None): + max_results=None, page_token=None, start_index=None): """ Get the data of a given dataset.table. see https://cloud.google.com/bigquery/docs/reference/v2/tabledata/list @@ -639,15 +801,56 @@ .execute() ) - def run_table_upsert(self, dataset_id, table_resource, project_id=None): + def run_table_delete(self, deletion_dataset_table, ignore_if_missing=False): + """ + Delete an existing table from the dataset; + If the table does not exist, return an error unless ignore_if_missing + is set to True. + + :param deletion_dataset_table: A dotted + (<project>.|<project>:)<dataset>.<table> that indicates which table + will be deleted. + :type deletion_dataset_table: str + :param ignore_if_missing: if True, then return success even if the + requested table does not exist. + :type ignore_if_missing: boolean + :return: + """ + + assert '.' in deletion_dataset_table, ( + 'Expected deletion_dataset_table in the format of ' + '<dataset>.<table>. Got: {}').format(deletion_dataset_table) + deletion_project, deletion_dataset, deletion_table = \ + _split_tablename(table_input=deletion_dataset_table, + default_project_id=self.project_id) + + try: + tables_resource = self.service.tables() \ + .delete(projectId=deletion_project, + datasetId=deletion_dataset, + tableId=deletion_table) \ + .execute() + self.log.info('Deleted table %s:%s.%s.', + deletion_project, deletion_dataset, deletion_table) + except HttpError: + if not ignore_if_missing: + raise Exception( + 'Table deletion failed. Table does not exist.') + else: + self.log.info('Table does not exist. Skipping.') + + + def run_table_upsert(self, dataset_id, table_resource, project_id=None): """ creates a new, empty table in the dataset; If the table already exists, update the existing table. Since BigQuery does not natively allow table upserts, this is not an atomic operation. + :param dataset_id: the dataset to upsert the table into. :type dataset_id: str - :param table_resource: a table resource. see https://cloud.google.com/bigquery/docs/reference/v2/tables#resource + :param table_resource: a table resource. see + https://cloud.google.com/bigquery/docs/reference/v2/tables#resource :type table_resource: dict :param project_id: the project to upsert the table into. If None, project will be self.project_id. @@ -655,15 +858,17 @@ """ # check to see if the table exists table_id = table_resource['tableReference']['tableId'] - project_id = project_id if project_id is not None else self.project_id + project_id = project_id if project_id is not None else self.project_id tables_list_resp = self.service.tables().list(projectId=project_id, datasetId=dataset_id).execute() - while True: + while True: for table in tables_list_resp.get('tables', []): if table['tableReference']['tableId'] == table_id: # found the table, do update - logging.info('table %s:%s.%s exists, updating.', - project_id, dataset_id, table_id) + self.log.info( + 'Table %s:%s.%s exists, updating.', + project_id, dataset_id, table_id + ) return self.service.tables().update(projectId=project_id, datasetId=dataset_id, tableId=table_id, @@ -678,8 +883,10 @@ # If there is no next page, then the table doesn't exist. else: # do insert - logging.info('table %s:%s.%s does not exist. creating.', - project_id, dataset_id, table_id) + self.log.info( + 'Table %s:%s.%s does not exist. creating.', + project_id, dataset_id, table_id + ) return self.service.tables().insert(projectId=project_id, datasetId=dataset_id, body=table_resource).execute() @@ -688,12 +895,13 @@ source_dataset, view_dataset, view_table, - source_project = None, - view_project = None): + source_project = None, + view_project = None): """ Grant authorized view access of a dataset to a view table. If this view has already been granted access to the dataset, do nothing. This method is not atomic. Running it may clobber a simultaneous update. + :param source_dataset: the source dataset :type source_dataset: str :param view_dataset: the dataset that the view is in @@ -723,18 +931,20 @@ 'tableId': view_table}} # check to see if the view we want to add already exists. if view_access not in access: - logging.info('granting table %s:%s.%s authorized view access to %s:%s dataset.', - view_project, view_dataset, view_table, - source_project, source_dataset) + self.log.info( + 'Granting table %s:%s.%s authorized view access to %s:%s dataset.', + view_proje