beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: Fix issue with update schema source format
Date Tue, 03 Aug 2021 16:52:25 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 3b994d6  Fix issue with update schema source format
     new 9ce826e  Merge pull request #15237 from [BEAM-12669] Fix issue with update schema
source format
3b994d6 is described below

commit 3b994d63be7d7d408c6c9d782db88df119d5ae13
Author: Sayat Satybaldiyev <sayat.satybaldiyev@getcruise.com>
AuthorDate: Tue Jul 27 18:08:22 2021 -0700

    Fix issue with update schema source format
---
 .../apache_beam/io/gcp/bigquery_file_loads.py      | 32 ++++++++++++++++------
 .../apache_beam/io/gcp/bigquery_write_it_test.py   | 27 +++++++++++++++---
 2 files changed, 46 insertions(+), 13 deletions(-)

diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
index 57204d4..e211ab4 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py
@@ -344,16 +344,25 @@ class UpdateDestinationSchema(beam.DoFn):
       write_disposition=None,
       test_client=None,
       additional_bq_parameters=None,
-      step_name=None):
+      step_name=None,
+      source_format=None):
     self._test_client = test_client
     self._write_disposition = write_disposition
     self._additional_bq_parameters = additional_bq_parameters or {}
     self._step_name = step_name
+    self._source_format = source_format
 
   def setup(self):
     self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client)
     self._bq_io_metadata = create_bigquery_io_metadata(self._step_name)
 
+  def display_data(self):
+    return {
+        'write_disposition': str(self._write_disposition),
+        'additional_bq_params': str(self._additional_bq_parameters),
+        'source_format': str(self._source_format),
+    }
+
   def process(self, element, schema_mod_job_name_prefix):
     destination = element[0]
     temp_table_load_job_reference = element[1]
@@ -415,7 +424,7 @@ class UpdateDestinationSchema(beam.DoFn):
     uid = _bq_uuid()
     job_name = '%s_%s_%s' % (schema_mod_job_name_prefix, destination_hash, uid)
 
-    _LOGGER.debug(
+    _LOGGER.info(
         'Triggering schema modification job %s on %s',
         job_name,
         table_reference)
@@ -429,7 +438,8 @@ class UpdateDestinationSchema(beam.DoFn):
         write_disposition='WRITE_APPEND',
         create_disposition='CREATE_NEVER',
         additional_load_parameters=additional_parameters,
-        job_labels=self._bq_io_metadata.add_additional_bq_job_labels())
+        job_labels=self._bq_io_metadata.add_additional_bq_job_labels(),
+        source_format=self._source_format)
     yield (destination, schema_update_job_reference)
 
 
@@ -573,7 +583,8 @@ class TriggerLoadJobs(beam.DoFn):
         'additional_bq_params': str(self.additional_bq_parameters),
         'schema': str(self.schema),
         'launchesBigQueryJobs': DisplayDataItem(
-            True, label="This Dataflow job launches bigquery jobs.")
+            True, label="This Dataflow job launches bigquery jobs."),
+        'source_format': str(self.source_format),
     }
     return result
 
@@ -619,8 +630,7 @@ class TriggerLoadJobs(beam.DoFn):
             table_reference.tableId))
     uid = _bq_uuid()
     job_name = '%s_%s_%s' % (load_job_name_prefix, destination_hash, uid)
-    _LOGGER.debug(
-        'Load job has %s files. Job name is %s.', len(files), job_name)
+    _LOGGER.info('Load job has %s files. Job name is %s.', len(files), job_name)
 
     create_disposition = self.create_disposition
     if self.temporary_tables:
@@ -635,11 +645,13 @@ class TriggerLoadJobs(beam.DoFn):
 
     _LOGGER.info(
         'Triggering job %s to load data to BigQuery table %s.'
-        'Schema: %s. Additional parameters: %s',
+        'Schema: %s. Additional parameters: %s. Source format: %s',
         job_name,
         table_reference,
         schema,
-        additional_parameters)
+        additional_parameters,
+        self.source_format,
+    )
     if not self.bq_io_metadata:
       self.bq_io_metadata = create_bigquery_io_metadata(self._step_name)
     job_reference = self.bq_wrapper.perform_load_job(
@@ -1015,7 +1027,9 @@ class BigQueryBatchFileLoads(beam.PTransform):
                 write_disposition=self.write_disposition,
                 test_client=self.test_client,
                 additional_bq_parameters=self.additional_bq_parameters,
-                step_name=step_name),
+                step_name=step_name,
+                source_format=self._temp_file_format,
+            ),
             schema_mod_job_name_pcv))
 
     finished_schema_mod_jobs_pc = (
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
index c7f1d44..3e0c641 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py
@@ -367,21 +367,40 @@ class BigQueryWriteIntegrationTests(unittest.TestCase):
     self.create_table(table_name)
     table_id = '{}.{}'.format(self.dataset_id, table_name)
 
-    input_data = [{"int64": 1, "bool": True}, {"int64": 2, "bool": False}]
+    input_data = [{
+        "int64": num, "bool": True, "nested_field": {
+            "fruit": "Apple"
+        }
+    } for num in range(1, 3)]
 
     table_schema = {
         "fields": [{
             "name": "int64", "type": "INT64"
         }, {
             "name": "bool", "type": "BOOL"
-        }]
+        },
+                   {
+                       "name": "nested_field",
+                       "type": "RECORD",
+                       "mode": "REPEATED",
+                       "fields": [
+                           {
+                               "name": "fruit",
+                               "type": "STRING",
+                               "mode": "NULLABLE"
+                           },
+                       ]
+                   }]
     }
 
     args = self.test_pipeline.get_full_options_as_args(
         on_success_matcher=BigqueryFullResultMatcher(
             project=self.project,
-            query="SELECT bytes, date, time, int64, bool FROM %s" % table_id,
-            data=[(None, None, None, 1, True), (None, None, None, 2, False)]))
+            query=
+            "SELECT bytes, date, time, int64, bool, nested_field.fruit FROM %s"
+            % table_id,
+            data=[(None, None, None, num, True, "Apple")
+                  for num in range(1, 3)]))
 
     with beam.Pipeline(argv=args) as p:
       # pylint: disable=expression-not-assigned

Mime
View raw message