From commits-return-114526-archive-asf-public=cust-asf.ponee.io@beam.apache.org Tue Aug 3 16:52:30 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-ec2-va.apache.org (mxout1-ec2-va.apache.org [3.227.148.255]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id BCA5B18068A for ; Tue, 3 Aug 2021 18:52:30 +0200 (CEST) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-ec2-va.apache.org (ASF Mail Server at mxout1-ec2-va.apache.org) with SMTP id 03AA33FFCB for ; Tue, 3 Aug 2021 16:52:30 +0000 (UTC) Received: (qmail 29333 invoked by uid 500); 3 Aug 2021 16:52:29 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 29320 invoked by uid 99); 3 Aug 2021 16:52:29 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 03 Aug 2021 16:52:29 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id AB6A981F23; Tue, 3 Aug 2021 16:52:29 +0000 (UTC) Date: Tue, 03 Aug 2021 16:52:25 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: Fix issue with update schema source format MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <162800953920.10430.14474147866167993077@gitbox.apache.org> From: pabloem@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: b20a42eedb67339f63beb0859416b971e3e76ad9 X-Git-Newrev: 3b994d63be7d7d408c6c9d782db88df119d5ae13 X-Git-Rev: 3b994d63be7d7d408c6c9d782db88df119d5ae13 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 3b994d6 Fix issue with update schema source format new 9ce826e Merge pull request #15237 from [BEAM-12669] Fix issue with update schema source format 3b994d6 is described below commit 3b994d63be7d7d408c6c9d782db88df119d5ae13 Author: Sayat Satybaldiyev AuthorDate: Tue Jul 27 18:08:22 2021 -0700 Fix issue with update schema source format --- .../apache_beam/io/gcp/bigquery_file_loads.py | 32 ++++++++++++++++------ .../apache_beam/io/gcp/bigquery_write_it_test.py | 27 +++++++++++++++--- 2 files changed, 46 insertions(+), 13 deletions(-) diff --git a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py index 57204d4..e211ab4 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_file_loads.py @@ -344,16 +344,25 @@ class UpdateDestinationSchema(beam.DoFn): write_disposition=None, test_client=None, additional_bq_parameters=None, - step_name=None): + step_name=None, + source_format=None): self._test_client = test_client self._write_disposition = write_disposition self._additional_bq_parameters = additional_bq_parameters or {} self._step_name = step_name + self._source_format = source_format def setup(self): self._bq_wrapper = bigquery_tools.BigQueryWrapper(client=self._test_client) self._bq_io_metadata = create_bigquery_io_metadata(self._step_name) + def display_data(self): + return { + 'write_disposition': str(self._write_disposition), + 'additional_bq_params': str(self._additional_bq_parameters), + 'source_format': str(self._source_format), + } + def process(self, element, schema_mod_job_name_prefix): destination = element[0] temp_table_load_job_reference = element[1] @@ -415,7 +424,7 @@ class UpdateDestinationSchema(beam.DoFn): uid = _bq_uuid() job_name = '%s_%s_%s' % (schema_mod_job_name_prefix, destination_hash, uid) - _LOGGER.debug( + _LOGGER.info( 'Triggering schema modification job %s on %s', job_name, table_reference) @@ -429,7 +438,8 @@ class UpdateDestinationSchema(beam.DoFn): write_disposition='WRITE_APPEND', create_disposition='CREATE_NEVER', additional_load_parameters=additional_parameters, - job_labels=self._bq_io_metadata.add_additional_bq_job_labels()) + job_labels=self._bq_io_metadata.add_additional_bq_job_labels(), + source_format=self._source_format) yield (destination, schema_update_job_reference) @@ -573,7 +583,8 @@ class TriggerLoadJobs(beam.DoFn): 'additional_bq_params': str(self.additional_bq_parameters), 'schema': str(self.schema), 'launchesBigQueryJobs': DisplayDataItem( - True, label="This Dataflow job launches bigquery jobs.") + True, label="This Dataflow job launches bigquery jobs."), + 'source_format': str(self.source_format), } return result @@ -619,8 +630,7 @@ class TriggerLoadJobs(beam.DoFn): table_reference.tableId)) uid = _bq_uuid() job_name = '%s_%s_%s' % (load_job_name_prefix, destination_hash, uid) - _LOGGER.debug( - 'Load job has %s files. Job name is %s.', len(files), job_name) + _LOGGER.info('Load job has %s files. Job name is %s.', len(files), job_name) create_disposition = self.create_disposition if self.temporary_tables: @@ -635,11 +645,13 @@ class TriggerLoadJobs(beam.DoFn): _LOGGER.info( 'Triggering job %s to load data to BigQuery table %s.' - 'Schema: %s. Additional parameters: %s', + 'Schema: %s. Additional parameters: %s. Source format: %s', job_name, table_reference, schema, - additional_parameters) + additional_parameters, + self.source_format, + ) if not self.bq_io_metadata: self.bq_io_metadata = create_bigquery_io_metadata(self._step_name) job_reference = self.bq_wrapper.perform_load_job( @@ -1015,7 +1027,9 @@ class BigQueryBatchFileLoads(beam.PTransform): write_disposition=self.write_disposition, test_client=self.test_client, additional_bq_parameters=self.additional_bq_parameters, - step_name=step_name), + step_name=step_name, + source_format=self._temp_file_format, + ), schema_mod_job_name_pcv)) finished_schema_mod_jobs_pc = ( diff --git a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py index c7f1d44..3e0c641 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_write_it_test.py @@ -367,21 +367,40 @@ class BigQueryWriteIntegrationTests(unittest.TestCase): self.create_table(table_name) table_id = '{}.{}'.format(self.dataset_id, table_name) - input_data = [{"int64": 1, "bool": True}, {"int64": 2, "bool": False}] + input_data = [{ + "int64": num, "bool": True, "nested_field": { + "fruit": "Apple" + } + } for num in range(1, 3)] table_schema = { "fields": [{ "name": "int64", "type": "INT64" }, { "name": "bool", "type": "BOOL" - }] + }, + { + "name": "nested_field", + "type": "RECORD", + "mode": "REPEATED", + "fields": [ + { + "name": "fruit", + "type": "STRING", + "mode": "NULLABLE" + }, + ] + }] } args = self.test_pipeline.get_full_options_as_args( on_success_matcher=BigqueryFullResultMatcher( project=self.project, - query="SELECT bytes, date, time, int64, bool FROM %s" % table_id, - data=[(None, None, None, 1, True), (None, None, None, 2, False)])) + query= + "SELECT bytes, date, time, int64, bool, nested_field.fruit FROM %s" + % table_id, + data=[(None, None, None, num, True, "Apple") + for num in range(1, 3)])) with beam.Pipeline(argv=args) as p: # pylint: disable=expression-not-assigned