From commits-return-12546-archive-asf-public=cust-asf.ponee.io@airflow.incubator.apache.org Sat Feb 10 17:24:27 2018 Return-Path: X-Original-To: archive-asf-public@eu.ponee.io Delivered-To: archive-asf-public@eu.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by mx-eu-01.ponee.io (Postfix) with ESMTP id E454A18061A for ; Sat, 10 Feb 2018 17:24:26 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id D3BE6160C4D; Sat, 10 Feb 2018 16:24:26 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C9789160C30 for ; Sat, 10 Feb 2018 17:24:25 +0100 (CET) Received: (qmail 95386 invoked by uid 500); 10 Feb 2018 16:24:24 -0000 Mailing-List: contact commits-help@airflow.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@airflow.incubator.apache.org Delivered-To: mailing list commits@airflow.incubator.apache.org Received: (qmail 95374 invoked by uid 99); 10 Feb 2018 16:24:24 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Feb 2018 16:24:24 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 82EADC0096 for ; Sat, 10 Feb 2018 16:24:24 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -11.731 X-Spam-Level: X-Spam-Status: No, score=-11.731 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, SPF_PASS=-0.001, T_RP_MATCHES_RCVD=-0.01, USER_IN_DEF_SPF_WL=-7.5] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id tdRTPDx7d3cL for ; Sat, 10 Feb 2018 16:24:20 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id 4A0345F16A for ; Sat, 10 Feb 2018 16:24:19 +0000 (UTC) Received: (qmail 95335 invoked by uid 99); 10 Feb 2018 16:24:18 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 10 Feb 2018 16:24:18 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8BF9CDFF5A; Sat, 10 Feb 2018 16:24:17 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: fokko@apache.org To: commits@airflow.incubator.apache.org Message-Id: <50dd6cff9aa149e285c4ceb42e95cc72@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: incubator-airflow git commit: [AIRFLOW-2095] Add operator to create External BigQuery Table Date: Sat, 10 Feb 2018 16:24:17 +0000 (UTC) Repository: incubator-airflow Updated Branches: refs/heads/master d5b5ae0ee -> 782288761 [AIRFLOW-2095] Add operator to create External BigQuery Table - Added operator to create External BigQuery Table - Added documentation - Added tests - Fixed documentation for GCS Closes #3028 from kaxil/bq-external-tb-op Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/78228876 Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/78228876 Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/78228876 Branch: refs/heads/master Commit: 782288761251489ebf6eef4c95585ad166c87a6f Parents: d5b5ae0 Author: Kaxil Naik Authored: Sat Feb 10 17:24:11 2018 +0100 Committer: Fokko Driesprong Committed: Sat Feb 10 17:24:11 2018 +0100 ---------------------------------------------------------------------- airflow/contrib/operators/bigquery_operator.py | 155 +++++++++++++++++++ airflow/contrib/operators/gcs_list_operator.py | 18 +-- airflow/contrib/operators/gcs_to_bq.py | 8 +- docs/code.rst | 1 + docs/integration.rst | 8 + .../contrib/operators/test_bigquery_operator.py | 46 +++++- 6 files changed, 222 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/airflow/contrib/operators/bigquery_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py index b1d64b8..d43a65c 100644 --- a/airflow/contrib/operators/bigquery_operator.py +++ b/airflow/contrib/operators/bigquery_operator.py @@ -267,3 +267,158 @@ class BigQueryCreateEmptyTableOperator(BaseOperator): schema_fields=schema_fields, time_partitioning=self.time_partitioning ) + + +class BigQueryCreateExternalTableOperator(BaseOperator): + """ + Creates a new external table in the dataset with the data in Google Cloud + Storage. + + The schema to be used for the BigQuery table may be specified in one of + two ways. You may either directly pass the schema fields in, or you may + point the operator to a Google cloud storage object name. The object in + Google cloud storage must be a JSON file with the schema fields in it. + + :param bucket: The bucket to point the external table to. + :type bucket: string + :param source_objects: List of Google cloud storage URIs to point table to. + If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI. + :type object: list + :param destination_project_dataset_table: The dotted (.). + BigQuery table to load data into. If is not included, project will + be the project defined in the connection json. + :type destination_project_dataset_table: string + :param schema_fields: If set, the schema field list as defined here: + https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema + + **Example**: :: + + schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"}, + {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}] + + Should not be set when source_format is 'DATASTORE_BACKUP'. + :type schema_fields: list + :param schema_object: If set, a GCS object path pointing to a .json file that + contains the schema for the table. + :param schema_object: string + :param source_format: File format of the data. + :type source_format: string + :param compression: [Optional] The compression type of the data source. + Possible values include GZIP and NONE. + The default value is NONE. + This setting is ignored for Google Cloud Bigtable, + Google Cloud Datastore backups and Avro formats. + :type compression: string + :param skip_leading_rows: Number of rows to skip when loading from a CSV. + :type skip_leading_rows: int + :param field_delimiter: The delimiter to use for the CSV. + :type field_delimiter: string + :param max_bad_records: The maximum number of bad records that BigQuery can + ignore when running the job. + :type max_bad_records: int + :param quote_character: The value that is used to quote data sections in a CSV file. + :type quote_character: string + :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false). + :type allow_quoted_newlines: boolean + :param allow_jagged_rows: Accept rows that are missing trailing optional columns. + The missing values are treated as nulls. If false, records with missing trailing + columns are treated as bad records, and if there are too many bad records, an + invalid error is returned in the job result. Only applicable to CSV, ignored + for other formats. + :type allow_jagged_rows: bool + :param bigquery_conn_id: Reference to a specific BigQuery hook. + :type bigquery_conn_id: string + :param google_cloud_storage_conn_id: Reference to a specific Google + cloud storage hook. + :type google_cloud_storage_conn_id: string + :param delegate_to: The account to impersonate, if any. For this to + work, the service account making the request must have domain-wide + delegation enabled. + :type delegate_to: string + :param src_fmt_configs: configure optional fields specific to the source format + :type src_fmt_configs: dict + """ + template_fields = ('bucket', 'source_objects', + 'schema_object', 'destination_project_dataset_table') + ui_color = '#f0eee4' + + @apply_defaults + def __init__(self, + bucket, + source_objects, + destination_project_dataset_table, + schema_fields=None, + schema_object=None, + source_format='CSV', + compression='NONE', + skip_leading_rows=0, + field_delimiter=',', + max_bad_records=0, + quote_character=None, + allow_quoted_newlines=False, + allow_jagged_rows=False, + bigquery_conn_id='bigquery_default', + google_cloud_storage_conn_id='google_cloud_storage_default', + delegate_to=None, + src_fmt_configs={}, + *args, **kwargs): + + super(BigQueryCreateExternalTableOperator, self).__init__(*args, **kwargs) + + # GCS config + self.bucket = bucket + self.source_objects = source_objects + self.schema_object = schema_object + + # BQ config + self.destination_project_dataset_table = destination_project_dataset_table + self.schema_fields = schema_fields + self.source_format = source_format + self.compression = compression + self.skip_leading_rows = skip_leading_rows + self.field_delimiter = field_delimiter + self.max_bad_records = max_bad_records + self.quote_character = quote_character + self.allow_quoted_newlines = allow_quoted_newlines + self.allow_jagged_rows = allow_jagged_rows + + self.bigquery_conn_id = bigquery_conn_id + self.google_cloud_storage_conn_id = google_cloud_storage_conn_id + self.delegate_to = delegate_to + + self.src_fmt_configs = src_fmt_configs + + def execute(self, context): + bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id, + delegate_to=self.delegate_to) + + if not self.schema_fields and self.schema_object \ + and self.source_format != 'DATASTORE_BACKUP': + gcs_hook = GoogleCloudStorageHook( + google_cloud_storage_conn_id=self.google_cloud_storage_conn_id, + delegate_to=self.delegate_to) + schema_fields = json.loads(gcs_hook.download( + self.bucket, + self.schema_object).decode("utf-8")) + else: + schema_fields = self.schema_fields + + source_uris = ['gs://{}/{}'.format(self.bucket, source_object) + for source_object in self.source_objects] + conn = bq_hook.get_conn() + cursor = conn.cursor() + + cursor.create_external_table( + external_project_dataset_table=self.destination_project_dataset_table, + schema_fields=schema_fields, + source_uris=source_uris, + source_format=self.source_format, + compression=self.compression, + skip_leading_rows=self.skip_leading_rows, + field_delimiter=self.field_delimiter, + max_bad_records=self.max_bad_records, + quote_character=self.quote_character, + allow_quoted_newlines=self.allow_quoted_newlines, + allow_jagged_rows=self.allow_jagged_rows, + src_fmt_configs=self.src_fmt_configs + ) http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/airflow/contrib/operators/gcs_list_operator.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_list_operator.py b/airflow/contrib/operators/gcs_list_operator.py index c374551..3c49cc1 100644 --- a/airflow/contrib/operators/gcs_list_operator.py +++ b/airflow/contrib/operators/gcs_list_operator.py @@ -41,16 +41,16 @@ class GoogleCloudStorageListOperator(BaseOperator): :type delegate_to: string **Example**: - The following Operator would list all the Avro files from `sales/sales-2017` - folder in `data` bucket. :: + The following Operator would list all the Avro files from ``sales/sales-2017`` + folder in ``data`` bucket. :: - GCS_Files = GoogleCloudStorageListOperator( - task_id='GCS_Files', - bucket='data', - prefix='sales/sales-2017/', - delimiter='.avro', - google_cloud_storage_conn_id=google_cloud_conn_id - ) + GCS_Files = GoogleCloudStorageListOperator( + task_id='GCS_Files', + bucket='data', + prefix='sales/sales-2017/', + delimiter='.avro', + google_cloud_storage_conn_id=google_cloud_conn_id + ) """ template_fields = ('bucket', 'prefix', 'delimiter') ui_color = '#f0eee4' http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/airflow/contrib/operators/gcs_to_bq.py ---------------------------------------------------------------------- diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py index f68da7f..97b1ef0 100644 --- a/airflow/contrib/operators/gcs_to_bq.py +++ b/airflow/contrib/operators/gcs_to_bq.py @@ -48,10 +48,10 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator): :param source_format: File format to export. :type source_format: string :param compression: [Optional] The compression type of the data source. - Possible values include GZIP and NONE. - The default value is NONE. - This setting is ignored for Google Cloud Bigtable, - Google Cloud Datastore backups and Avro formats. + Possible values include GZIP and NONE. + The default value is NONE. + This setting is ignored for Google Cloud Bigtable, + Google Cloud Datastore backups and Avro formats. :type compression: string :param create_disposition: The create disposition if the table doesn't exist. :type create_disposition: string http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/docs/code.rst ---------------------------------------------------------------------- diff --git a/docs/code.rst b/docs/code.rst index a984f29..3f3811a 100644 --- a/docs/code.rst +++ b/docs/code.rst @@ -99,6 +99,7 @@ Community-contributed Operators .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator .. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator +.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator .. autoclass:: airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator .. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/docs/integration.rst ---------------------------------------------------------------------- diff --git a/docs/integration.rst b/docs/integration.rst index 261649b..9755136 100644 --- a/docs/integration.rst +++ b/docs/integration.rst @@ -349,6 +349,7 @@ BigQuery Operators - :ref:`BigQueryValueCheckOperator` : Performs a simple value check using SQL code. - :ref:`BigQueryIntervalCheckOperator` : Checks that the values of metrics given as SQL expressions are within a certain tolerance of the ones from days_back before. - :ref:`BigQueryCreateEmptyTableOperator` : Creates a new, empty table in the specified BigQuery dataset optionally with schema. +- :ref:`BigQueryCreateExternalTableOperator` : Creates a new, external table in the dataset with the data in Google Cloud Storage. - :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific BigQuery database. - :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another BigQuery table. - :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a Google Cloud Storage bucket @@ -389,6 +390,13 @@ BigQueryCreateEmptyTableOperator .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator +.. _BigQueryCreateExternalTableOperator: + +BigQueryCreateExternalTableOperator +^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ + +.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator + .. _BigQueryOperator: BigQueryOperator http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/tests/contrib/operators/test_bigquery_operator.py ---------------------------------------------------------------------- diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py index b7f7285..b84bff8 100644 --- a/tests/contrib/operators/test_bigquery_operator.py +++ b/tests/contrib/operators/test_bigquery_operator.py @@ -15,6 +15,8 @@ import unittest from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator +from airflow.contrib.operators.bigquery_operator \ + import BigQueryCreateExternalTableOperator try: from unittest import mock @@ -24,10 +26,13 @@ except ImportError: except ImportError: mock = None -TASK_ID = 'test-bq-create-empty-table-operator' +TASK_ID = 'test-bq-create-table-operator' TEST_DATASET = 'test-dataset' TEST_PROJECT_ID = 'test-project' TEST_TABLE_ID = 'test-table-id' +TEST_GCS_BUCKET = 'test-bucket' +TEST_GCS_DATA = ['dir1/*.csv'] +TEST_SOURCE_FORMAT = 'CSV' class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase): @@ -51,3 +56,42 @@ class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase): schema_fields=None, time_partitioning={} ) + + +class BigQueryCreateExternalTableOperatorTest(unittest.TestCase): + + @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook') + def test_execute(self, mock_hook): + operator = BigQueryCreateExternalTableOperator( + task_id=TASK_ID, + destination_project_dataset_table='{}.{}'.format( + TEST_DATASET, TEST_TABLE_ID + ), + schema_fields=[], + bucket=TEST_GCS_BUCKET, + source_objects=TEST_GCS_DATA, + source_format=TEST_SOURCE_FORMAT + ) + + operator.execute(None) + mock_hook.return_value \ + .get_conn() \ + .cursor() \ + .create_external_table \ + .assert_called_once_with( + external_project_dataset_table='{}.{}'.format( + TEST_DATASET, TEST_TABLE_ID + ), + schema_fields=[], + source_uris=['gs://{}/{}'.format(TEST_GCS_BUCKET, source_object) + for source_object in TEST_GCS_DATA], + source_format=TEST_SOURCE_FORMAT, + compression='NONE', + skip_leading_rows=0, + field_delimiter=',', + max_bad_records=0, + quote_character=None, + allow_quoted_newlines=False, + allow_jagged_rows=False, + src_fmt_configs={} + )