airflow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From criccom...@apache.org
Subject incubator-airflow git commit: [AIRFLOW-1188] Add max_bad_records param to GoogleCloudStorageToBigQueryOperator
Date Wed, 10 May 2017 16:36:54 GMT
Repository: incubator-airflow
Updated Branches:
  refs/heads/master 72cf07b48 -> 443e6b295


[AIRFLOW-1188] Add max_bad_records param to GoogleCloudStorageToBigQueryOperator

Closes #2286 from ckpklos/master


Project: http://git-wip-us.apache.org/repos/asf/incubator-airflow/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-airflow/commit/443e6b29
Tree: http://git-wip-us.apache.org/repos/asf/incubator-airflow/tree/443e6b29
Diff: http://git-wip-us.apache.org/repos/asf/incubator-airflow/diff/443e6b29

Branch: refs/heads/master
Commit: 443e6b295f94397c77082776238147e7c2b19435
Parents: 72cf07b
Author: ckpklos <piotr.klos@circlekeurope.com>
Authored: Wed May 10 09:36:48 2017 -0700
Committer: Chris Riccomini <criccomini@apache.org>
Committed: Wed May 10 09:36:48 2017 -0700

----------------------------------------------------------------------
 airflow/contrib/hooks/bigquery_hook.py | 7 +++++++
 airflow/contrib/operators/gcs_to_bq.py | 6 ++++++
 2 files changed, 13 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/443e6b29/airflow/contrib/hooks/bigquery_hook.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/hooks/bigquery_hook.py b/airflow/contrib/hooks/bigquery_hook.py
index 06de4e8..59ece43 100644
--- a/airflow/contrib/hooks/bigquery_hook.py
+++ b/airflow/contrib/hooks/bigquery_hook.py
@@ -369,6 +369,7 @@ class BigQueryBaseCursor(object):
                  skip_leading_rows=0,
                  write_disposition='WRITE_EMPTY',
                  field_delimiter=',',
+                 max_bad_records=0,
                  schema_update_options=()):
         """
         Executes a BigQuery load command to load data from Google Cloud Storage
@@ -400,6 +401,9 @@ class BigQueryBaseCursor(object):
         :type write_disposition: string
         :param field_delimiter: The delimiter to use when loading from a CSV.
         :type field_delimiter: string
+        :param max_bad_records: The maximum number of bad records that BigQuery can
+            ignore when running the job.
+        :type max_bad_records: int
         :param schema_update_options: Allows the schema of the desitination
             table to be updated as a side effect of the load job.
         :type schema_update_options: list
@@ -473,6 +477,9 @@ class BigQueryBaseCursor(object):
             configuration['load']['skipLeadingRows'] = skip_leading_rows
             configuration['load']['fieldDelimiter'] = field_delimiter
 
+        if max_bad_records:
+            configuration['load']['maxBadRecords'] = max_bad_records
+
         return self.run_with_configuration(configuration)
 
     def run_with_configuration(self, configuration):

http://git-wip-us.apache.org/repos/asf/incubator-airflow/blob/443e6b29/airflow/contrib/operators/gcs_to_bq.py
----------------------------------------------------------------------
diff --git a/airflow/contrib/operators/gcs_to_bq.py b/airflow/contrib/operators/gcs_to_bq.py
index aff539a..44cf7b6 100644
--- a/airflow/contrib/operators/gcs_to_bq.py
+++ b/airflow/contrib/operators/gcs_to_bq.py
@@ -43,6 +43,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         skip_leading_rows=0,
         write_disposition='WRITE_EMPTY',
         field_delimiter=',',
+        max_bad_records=0,
         max_id_key=None,
         bigquery_conn_id='bigquery_default',
         google_cloud_storage_conn_id='google_cloud_storage_default',
@@ -80,6 +81,9 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         :type write_disposition: string
         :param field_delimiter: The delimiter to use when loading from a CSV.
         :type field_delimiter: string
+        :param max_bad_records: The maximum number of bad records that BigQuery can
+            ignore when running the job.
+        :type max_bad_records: int
         :param max_id_key: If set, the name of a column in the BigQuery table
             that's to be loaded. Thsi will be used to select the MAX value from
             BigQuery after the load occurs. The results will be returned by the
@@ -115,6 +119,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
         self.skip_leading_rows = skip_leading_rows
         self.write_disposition = write_disposition
         self.field_delimiter = field_delimiter
+        self.max_bad_records = max_bad_records
 
         self.max_id_key = max_id_key
         self.bigquery_conn_id = bigquery_conn_id
@@ -150,6 +155,7 @@ class GoogleCloudStorageToBigQueryOperator(BaseOperator):
             skip_leading_rows=self.skip_leading_rows,
             write_disposition=self.write_disposition,
             field_delimiter=self.field_delimiter,
+            max_bad_records=self.max_bad_records,
             schema_update_options=self.schema_update_options)
 
         if self.max_id_key:


Mime
View raw message