airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From fo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-2095] Add operator to create External BigQuery Table
Date Sat, 10 Feb 2018 16:24:17 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master d5b5ae0ee -> 782288761


[AIRFLOW-2095] Add operator to create External BigQuery Table

- Added operator to create External BigQuery Table
- Added documentation
- Added tests
- Fixed documentation for GCS

Closes #3028 from kaxil/bq-external-tb-op


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/78228876
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/78228876
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/78228876

Branch: refs/heads/master
Commit: 782288761251489ebf6eef4c95585ad166c87a6f
Parents: d5b5ae0
Author: Kaxil Naik <kaxilnaik@gmail.com>
Authored: Sat Feb 10 17:24:11 2018 +0100
Committer: Fokko Driesprong <fokkodriesprong@godatadriven.com>
Committed: Sat Feb 10 17:24:11 2018 +0100

----------------------------------------------------------------------
 airflow/contrib/operators/bigquery_operator.py  | 155 +++++++++++++++++++
 airflow/contrib/operators/gcs_list_operator.py  |  18 +--
 airflow/contrib/operators/gcs_to_bq.py          |   8 +-
 docs/code.rst                                   |   1 +
 docs/integration.rst                            |   8 +
 .../contrib/operators/test_bigquery_operator.py |  46 +++++-
 6 files changed, 222 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/airflow/contrib/operators/bigquery_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/bigquery_operator.py b/airflow/contrib/operators/bigquery_operator.py
index b1d64b8..d43a65c 100644
--- a/airflow/contrib/operators/bigquery_operator.py
+++ b/airflow/contrib/operators/bigquery_operator.py
@@ -267,3 +267,158 @@ class BigQueryCreateEmptyTableOperator(BaseOperator):
             schema_fields=schema_fields,
             time_partitioning=self.time_partitioning
         )
+
+
+class BigQueryCreateExternalTableOperator(BaseOperator):
+    """
+    Creates a new external table in the dataset with the data in Google Cloud
+    Storage.
+
+    The schema to be used for the BigQuery table may be specified in one of
+    two ways. You may either directly pass the schema fields in, or you may
+    point the operator to a Google cloud storage object name. The object in
+    Google cloud storage must be a JSON file with the schema fields in it.
+
+    :param bucket: The bucket to point the external table to.
+    :type bucket: string
+    :param source_objects: List of Google cloud storage URIs to point table to.
+        If source_format is 'DATASTORE_BACKUP', the list must only contain a single URI.
+    :type object: list
+    :param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table>
+        BigQuery table to load data into. If <project> is not included, project will
+        be the project defined in the connection json.
+    :type destination_project_dataset_table: string
+    :param schema_fields: If set, the schema field list as defined here:
+        https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs#configuration.load.schema
+
+        **Example**: ::
+
+            schema_fields=[{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
+                           {"name": "salary", "type": "INTEGER", "mode": "NULLABLE"}]
+
+        Should not be set when source_format is 'DATASTORE_BACKUP'.
+    :type schema_fields: list
+    :param schema_object: If set, a GCS object path pointing to a .json file that
+        contains the schema for the table.
+    :param schema_object: string
+    :param source_format: File format of the data.
+    :type source_format: string
+    :param compression: [Optional] The compression type of the data source.
+        Possible values include GZIP and NONE.
+        The default value is NONE.
+        This setting is ignored for Google Cloud Bigtable,
+        Google Cloud Datastore backups and Avro formats.
+    :type compression: string
+    :param skip_leading_rows: Number of rows to skip when loading from a CSV.
+    :type skip_leading_rows: int
+    :param field_delimiter: The delimiter to use for the CSV.
+    :type field_delimiter: string
+    :param max_bad_records: The maximum number of bad records that BigQuery can
+        ignore when running the job.
+    :type max_bad_records: int
+    :param quote_character: The value that is used to quote data sections in a CSV file.
+    :type quote_character: string
+    :param allow_quoted_newlines: Whether to allow quoted newlines (true) or not (false).
+    :type allow_quoted_newlines: boolean
+    :param allow_jagged_rows: Accept rows that are missing trailing optional columns.
+        The missing values are treated as nulls. If false, records with missing trailing
+        columns are treated as bad records, and if there are too many bad records, an
+        invalid error is returned in the job result. Only applicable to CSV, ignored
+        for other formats.
+    :type allow_jagged_rows: bool
+    :param bigquery_conn_id: Reference to a specific BigQuery hook.
+    :type bigquery_conn_id: string
+    :param google_cloud_storage_conn_id: Reference to a specific Google
+        cloud storage hook.
+    :type google_cloud_storage_conn_id: string
+    :param delegate_to: The account to impersonate, if any. For this to
+        work, the service account making the request must have domain-wide
+        delegation enabled.
+    :type delegate_to: string
+    :param src_fmt_configs: configure optional fields specific to the source format
+    :type src_fmt_configs: dict
+    """
+    template_fields = ('bucket', 'source_objects',
+                       'schema_object', 'destination_project_dataset_table')
+    ui_color = '#f0eee4'
+
+    @apply_defaults
+    def __init__(self,
+                 bucket,
+                 source_objects,
+                 destination_project_dataset_table,
+                 schema_fields=None,
+                 schema_object=None,
+                 source_format='CSV',
+                 compression='NONE',
+                 skip_leading_rows=0,
+                 field_delimiter=',',
+                 max_bad_records=0,
+                 quote_character=None,
+                 allow_quoted_newlines=False,
+                 allow_jagged_rows=False,
+                 bigquery_conn_id='bigquery_default',
+                 google_cloud_storage_conn_id='google_cloud_storage_default',
+                 delegate_to=None,
+                 src_fmt_configs={},
+                 *args, **kwargs):
+
+        super(BigQueryCreateExternalTableOperator, self).__init__(*args, **kwargs)
+
+        # GCS config
+        self.bucket = bucket
+        self.source_objects = source_objects
+        self.schema_object = schema_object
+
+        # BQ config
+        self.destination_project_dataset_table = destination_project_dataset_table
+        self.schema_fields = schema_fields
+        self.source_format = source_format
+        self.compression = compression
+        self.skip_leading_rows = skip_leading_rows
+        self.field_delimiter = field_delimiter
+        self.max_bad_records = max_bad_records
+        self.quote_character = quote_character
+        self.allow_quoted_newlines = allow_quoted_newlines
+        self.allow_jagged_rows = allow_jagged_rows
+
+        self.bigquery_conn_id = bigquery_conn_id
+        self.google_cloud_storage_conn_id = google_cloud_storage_conn_id
+        self.delegate_to = delegate_to
+
+        self.src_fmt_configs = src_fmt_configs
+
+    def execute(self, context):
+        bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
+                               delegate_to=self.delegate_to)
+
+        if not self.schema_fields and self.schema_object \
+                and self.source_format != 'DATASTORE_BACKUP':
+            gcs_hook = GoogleCloudStorageHook(
+                google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+                delegate_to=self.delegate_to)
+            schema_fields = json.loads(gcs_hook.download(
+                self.bucket,
+                self.schema_object).decode("utf-8"))
+        else:
+            schema_fields = self.schema_fields
+
+        source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
+                       for source_object in self.source_objects]
+        conn = bq_hook.get_conn()
+        cursor = conn.cursor()
+
+        cursor.create_external_table(
+            external_project_dataset_table=self.destination_project_dataset_table,
+            schema_fields=schema_fields,
+            source_uris=source_uris,
+            source_format=self.source_format,
+            compression=self.compression,
+            skip_leading_rows=self.skip_leading_rows,
+            field_delimiter=self.field_delimiter,
+            max_bad_records=self.max_bad_records,
+            quote_character=self.quote_character,
+            allow_quoted_newlines=self.allow_quoted_newlines,
+            allow_jagged_rows=self.allow_jagged_rows,
+            src_fmt_configs=self.src_fmt_configs
+        )

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/airflow/contrib/operators/gcs_list_operator.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_list_operator.py b/airflow/contrib/operators/gcs_list_operator.py
index c374551..3c49cc1 100644
--- a/airflow/contrib/operators/gcs_list_operator.py
+++ b/airflow/contrib/operators/gcs_list_operator.py
@@ -41,16 +41,16 @@ class GoogleCloudStorageListOperator(BaseOperator):
     :type delegate_to: string
 
     **Example**:
-        The following Operator would list all the Avro files from `sales/sales-2017`
-        folder in `data` bucket. ::
+        The following Operator would list all the Avro files from ``sales/sales-2017``
+        folder in ``data`` bucket. ::
 
-        GCS_Files = GoogleCloudStorageListOperator(
-            task_id='GCS_Files',
-            bucket='data',
-            prefix='sales/sales-2017/',
-            delimiter='.avro',
-            google_cloud_storage_conn_id=google_cloud_conn_id
-        )
+            GCS_Files = GoogleCloudStorageListOperator(
+                task_id='GCS_Files',
+                bucket='data',
+                prefix='sales/sales-2017/',
+                delimiter='.avro',
+                google_cloud_storage_conn_id=google_cloud_conn_id
+            )
     """
     template_fields = ('bucket', 'prefix', 'delimiter')
     ui_color = '#f0eee4'

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index f68da7f..97b1ef0 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -48,10 +48,10 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
     :param source_format: File format to export.
     :type source_format: string
     :param compression: [Optional] The compression type of the data source.
-            Possible values include GZIP and NONE.
-            The default value is NONE.
-            This setting is ignored for Google Cloud Bigtable,
-                Google Cloud Datastore backups and Avro formats.
+        Possible values include GZIP and NONE.
+        The default value is NONE.
+        This setting is ignored for Google Cloud Bigtable,
+        Google Cloud Datastore backups and Avro formats.
     :type compression: string
     :param create_disposition: The create disposition if the table doesn't exist.
     :type create_disposition: string

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/docs/code.rst
----------------------------------------------------------------------
diff --git a/docs/code.rst b/docs/code.rst
index a984f29..3f3811a 100644
--- a/docs/code.rst
+++ b/docs/code.rst
@@ -99,6 +99,7 @@ Community-contributed Operators
 .. autoclass:: airflow.contrib.operators.bigquery_check_operator.BigQueryIntervalCheckOperator
 .. autoclass:: airflow.contrib.operators.bigquery_get_data.BigQueryGetDataOperator
 .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
+.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator
 .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryOperator
 .. autoclass:: airflow.contrib.operators.bigquery_table_delete_operator.BigQueryTableDeleteOperator
 .. autoclass:: airflow.contrib.operators.bigquery_to_bigquery.BigQueryToBigQueryOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/docs/integration.rst
----------------------------------------------------------------------
diff --git a/docs/integration.rst b/docs/integration.rst
index 261649b..9755136 100644
--- a/docs/integration.rst
+++ b/docs/integration.rst
@@ -349,6 +349,7 @@ BigQuery Operators
 - :ref:`BigQueryValueCheckOperator` : Performs a simple value check using SQL code.
 - :ref:`BigQueryIntervalCheckOperator` : Checks that the values of metrics given as SQL expressions
are within a certain tolerance of the ones from days_back before.
 - :ref:`BigQueryCreateEmptyTableOperator` : Creates a new, empty table in the specified BigQuery
dataset optionally with schema.
+- :ref:`BigQueryCreateExternalTableOperator` : Creates a new, external table in the dataset
with the data in Google Cloud Storage.
 - :ref:`BigQueryOperator` : Executes BigQuery SQL queries in a specific BigQuery database.
 - :ref:`BigQueryToBigQueryOperator` : Copy a BigQuery table to another BigQuery table.
 - :ref:`BigQueryToCloudStorageOperator` : Transfers a BigQuery table to a Google Cloud Storage
bucket
@@ -389,6 +390,13 @@ BigQueryCreateEmptyTableOperator
 
 .. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateEmptyTableOperator
 
+.. _BigQueryCreateExternalTableOperator:
+
+BigQueryCreateExternalTableOperator
+^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
+
+.. autoclass:: airflow.contrib.operators.bigquery_operator.BigQueryCreateExternalTableOperator
+
 .. _BigQueryOperator:
 
 BigQueryOperator

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/78228876/tests/contrib/operators/test_bigquery_operator.py
----------------------------------------------------------------------
diff --git a/tests/contrib/operators/test_bigquery_operator.py b/tests/contrib/operators/test_bigquery_operator.py
index b7f7285..b84bff8 100644
--- a/tests/contrib/operators/test_bigquery_operator.py
+++ b/tests/contrib/operators/test_bigquery_operator.py
@@ -15,6 +15,8 @@
 import unittest
 
 from airflow.contrib.operators.bigquery_operator import BigQueryCreateEmptyTableOperator
+from airflow.contrib.operators.bigquery_operator \
+    import BigQueryCreateExternalTableOperator
 
 try:
     from unittest import mock
@@ -24,10 +26,13 @@ except ImportError:
     except ImportError:
         mock = None
 
-TASK_ID = 'test-bq-create-empty-table-operator'
+TASK_ID = 'test-bq-create-table-operator'
 TEST_DATASET = 'test-dataset'
 TEST_PROJECT_ID = 'test-project'
 TEST_TABLE_ID = 'test-table-id'
+TEST_GCS_BUCKET = 'test-bucket'
+TEST_GCS_DATA = ['dir1/*.csv']
+TEST_SOURCE_FORMAT = 'CSV'
 
 
 class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase):
@@ -51,3 +56,42 @@ class BigQueryCreateEmptyTableOperatorTest(unittest.TestCase):
                 schema_fields=None,
                 time_partitioning={}
             )
+
+
+class BigQueryCreateExternalTableOperatorTest(unittest.TestCase):
+
+    @mock.patch('airflow.contrib.operators.bigquery_operator.BigQueryHook')
+    def test_execute(self, mock_hook):
+        operator = BigQueryCreateExternalTableOperator(
+            task_id=TASK_ID,
+            destination_project_dataset_table='{}.{}'.format(
+                TEST_DATASET, TEST_TABLE_ID
+            ),
+            schema_fields=[],
+            bucket=TEST_GCS_BUCKET,
+            source_objects=TEST_GCS_DATA,
+            source_format=TEST_SOURCE_FORMAT
+        )
+
+        operator.execute(None)
+        mock_hook.return_value \
+            .get_conn() \
+            .cursor() \
+            .create_external_table \
+            .assert_called_once_with(
+                external_project_dataset_table='{}.{}'.format(
+                    TEST_DATASET, TEST_TABLE_ID
+                ),
+                schema_fields=[],
+                source_uris=['gs://{}/{}'.format(TEST_GCS_BUCKET, source_object)
+                             for source_object in TEST_GCS_DATA],
+                source_format=TEST_SOURCE_FORMAT,
+                compression='NONE',
+                skip_leading_rows=0,
+                field_delimiter=',',
+                max_bad_records=0,
+                quote_character=None,
+                allow_quoted_newlines=False,
+                allow_jagged_rows=False,
+                src_fmt_configs={}
+            )


Mime
View raw message