airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From bo...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-716] Allow AVRO BigQuery load-job without schema
Date Thu, 29 Dec 2016 21:57:19 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 7220e72e3 -> 410736dbc


[AIRFLOW-716] Allow AVRO BigQuery load-job without schema

Now allow a load job without specifying the schema
fields or object.
This allows for loading files with embedded schema
like AVRO files.

Also made optional values None instead of False to
make it a bit more
Pythoneske without breaking compatibility.

Closes #1958 from
alexvanboxel/feature/bq_load_avro


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/410736db
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/410736db
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/410736db

Branch: refs/heads/master
Commit: 410736dbc440d643fb3ea5f2094b64b8d4e3ba3b
Parents: 7220e72
Author: Alex Van Boxel <alex@vanboxel.be>
Authored: Thu Dec 29 22:57:11 2016 +0100
Committer: Bolke de Bruin <bolke@xs4all.nl>
Committed: Thu Dec 29 22:57:11 2016 +0100

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py | 17 ++++++------
 airflow/contrib/operators/gcs_to_bq.py | 41 ++++++++++++++++++++---------
 2 files changed, 37 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/410736db/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 450ee7a..d796565 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -18,20 +18,20 @@ This module contains a BigQuery Hook, as well as a very basic PEP 249
 implementation for BigQuery.
 """
 
-from builtins import range
-from past.builtins import basestring
-
 import logging
 import time
 
-from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
-from airflow.hooks.dbapi_hook import DbApiHook
 from apiclient.discovery import build, HttpError
+from builtins import range
 from pandas.io.gbq import GbqConnector, \
     _parse_data as gbq_parse_data, \
     _check_google_client_version as gbq_check_google_client_version, \
     _test_google_api_imports as gbq_test_google_api_imports
 from pandas.tools.merge import concat
+from past.builtins import basestring
+
+from airflow.contrib.hooks.gcp_api_base_hook import GoogleCloudBaseHook
+from airflow.hooks.dbapi_hook import DbApiHook
 
 logging.getLogger("bigquery").setLevel(logging.INFO)
 
@@ -418,14 +418,15 @@ class BigQueryBaseCursor(object):
                     'datasetId': destination_dataset,
                     'tableId': destination_table,
                 },
-                'schema': {
-                    'fields': schema_fields
-                },
                 'sourceFormat': source_format,
                 'sourceUris': source_uris,
                 'writeDisposition': write_disposition,
             }
         }
+        if schema_fields:
+            configuration['load']['schema'] = {
+                'fields': schema_fields
+            }
 
         if schema_update_options:
             if write_disposition not in ["WRITE_APPEND", "WRITE_TRUNCATE"]:

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/410736db/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index c2f0c79..aff539a 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -25,7 +25,8 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
     """
     Loads files from Google cloud storage into BigQuery.
     """
-    template_fields = ('bucket','source_objects','schema_object','destination_project_dataset_table')
+    template_fields = ('bucket', 'source_objects',
+                       'schema_object', 'destination_project_dataset_table')
     template_ext = ('.sql',)
     ui_color = '#f0eee4'
 
@@ -35,14 +36,14 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         bucket,
         source_objects,
         destination_project_dataset_table,
-        schema_fields=False,
-        schema_object=False,
+        schema_fields=None,
+        schema_object=None,
         source_format='CSV',
         create_disposition='CREATE_IF_NEEDED',
         skip_leading_rows=0,
         write_disposition='WRITE_EMPTY',
         field_delimiter=',',
-        max_id_key=False,
+        max_id_key=None,
         bigquery_conn_id='bigquery_default',
         google_cloud_storage_conn_id='google_cloud_storage_default',
         delegate_to=None,
@@ -59,13 +60,15 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         :type bucket: string
         :param source_objects: List of Google cloud storage URIs to load from.
         :type object: list
-        :param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table>
BigQuery table to load data
-            into. If <project> is not included, project will be the project defined
in the connection json.
+        :param destination_project_dataset_table: The dotted (<project>.)<dataset>.<table>
+            BigQuery table to load data into. If <project> is not included, project
will
+            be the project defined in the connection json.
         :type destination_project_dataset_table: string
         :param schema_fields: If set, the schema field list as defined here:
             https://cloud.google.com/bigquery/docs/reference/v2/jobs#configuration.load
         :type schema_fields: list
-        :param schema_object: If set, a GCS object path pointing to a .json file that contains
the schema for the table.
+        :param schema_object: If set, a GCS object path pointing to a .json file that
+            contains the schema for the table.
         :param schema_object: string
         :param source_format: File format to export.
         :type source_format: string
@@ -121,13 +124,21 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         self.schema_update_options = schema_update_options
 
     def execute(self, context):
-        gcs_hook = GoogleCloudStorageHook(google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
-                                          delegate_to=self.delegate_to)
         bq_hook = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,
                                delegate_to=self.delegate_to)
 
-        schema_fields = self.schema_fields if self.schema_fields else json.loads(gcs_hook.download(self.bucket,
self.schema_object).decode("utf-8"))
-        source_uris = ['gs://{}/{}'.format(self.bucket, schema_object) for schema_object
in self.source_objects]
+        if not self.schema_fields and self.schema_object:
+            gcs_hook = GoogleCloudStorageHook(
+                google_cloud_storage_conn_id=self.google_cloud_storage_conn_id,
+                delegate_to=self.delegate_to)
+            schema_fields = json.loads(gcs_hook.download(
+                self.bucket,
+                self.schema_object).decode("utf-8"))
+        else:
+            schema_fields = self.schema_fields
+
+        source_uris = ['gs://{}/{}'.format(self.bucket, source_object)
+                       for source_object in self.source_objects]
         conn = bq_hook.get_conn()
         cursor = conn.cursor()
         cursor.run_load(
@@ -142,8 +153,12 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
             schema_update_options=self.schema_update_options)
 
         if self.max_id_key:
-            cursor.execute('SELECT MAX({}) FROM {}'.format(self.max_id_key, self.destination_project_dataset_table))
+            cursor.execute('SELECT MAX({}) FROM {}'.format(
+                self.max_id_key,
+                self.destination_project_dataset_table))
             row = cursor.fetchone()
             max_id = row[0] if row[0] else 0
-            logging.info('Loaded BQ data with max {}.{}={}'.format(self.destination_project_dataset_table,
self.max_id_key, max_id))
+            logging.info('Loaded BQ data with max {}.{}={}'.format(
+                self.destination_project_dataset_table,
+                self.max_id_key, max_id))
             return max_id


Mime
View raw message