Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 5B31E200C8C for ; Tue, 6 Jun 2017 23:05:39 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 59D8C160BC6; Tue, 6 Jun 2017 21:05:39 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 28E94160BB7 for ; Tue, 6 Jun 2017 23:05:38 +0200 (CEST) Received: (qmail 87828 invoked by uid 500); 6 Jun 2017 21:05:37 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 87814 invoked by uid 99); 6 Jun 2017 21:05:37 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 06 Jun 2017 21:05:37 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 2D202DFF81; Tue, 6 Jun 2017 21:05:37 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: chamikara@apache.org To: commits@beam.apache.org Date: Tue, 06 Jun 2017 21:05:37 -0000 Message-Id: <86268d4fdc8c4948a57a12f0d22ea121@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: [BEAM-2405] Write to BQ using the streaming API archived-at: Tue, 06 Jun 2017 21:05:39 -0000 Repository: beam Updated Branches: refs/heads/master fa3922b57 -> fdfd77510 [BEAM-2405] Write to BQ using the streaming API Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/1498684d Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/1498684d Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/1498684d Branch: refs/heads/master Commit: 1498684dfea31594a236edd7fde5d299e4b0aa1e Parents: fa3922b Author: Sourabh Bajaj Authored: Fri Jun 2 20:32:48 2017 -0700 Committer: chamikara@google.com Committed: Tue Jun 6 14:04:33 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcp/bigquery.py | 177 +++++++++++++++++++ sdks/python/apache_beam/io/gcp/bigquery_test.py | 172 ++++++++++++++++++ 2 files changed, 349 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/1498684d/sdks/python/apache_beam/io/gcp/bigquery.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py index 201c798..9069f73 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery.py +++ b/sdks/python/apache_beam/io/gcp/bigquery.py @@ -115,6 +115,9 @@ from apache_beam.internal.gcp import auth from apache_beam.internal.gcp.json_value import from_json_value from apache_beam.internal.gcp.json_value import to_json_value from apache_beam.runners.dataflow.native_io import iobase as dataflow_io +from apache_beam.transforms import DoFn +from apache_beam.transforms import ParDo +from apache_beam.transforms import PTransform from apache_beam.transforms.display import DisplayDataItem from apache_beam.utils import retry from apache_beam.options.pipeline_options import GoogleCloudOptions @@ -134,6 +137,7 @@ __all__ = [ 'BigQueryDisposition', 'BigQuerySource', 'BigQuerySink', + 'WriteToBigQuery', ] JSON_COMPLIANCE_ERROR = 'NAN, INF and -INF values are not JSON compliant.' @@ -813,6 +817,7 @@ class BigQueryWrapper(object): request = bigquery.BigqueryTablesInsertRequest( projectId=project_id, datasetId=dataset_id, table=table) response = self.client.tables.Insert(request) + logging.debug("Created the table with id %s", table_id) # The response is a bigquery.Table instance. return response @@ -1134,3 +1139,175 @@ class BigQueryWrapper(object): else: result[field.name] = self._convert_cell_value_to_dict(value, field) return result + + +class BigQueryWriteFn(DoFn): + """A ``DoFn`` that streams writes to BigQuery once the table is created. + """ + + def __init__(self, table_id, dataset_id, project_id, batch_size, schema, + create_disposition, write_disposition, client): + """Initialize a WriteToBigQuery transform. + + Args: + table_id: The ID of the table. The ID must contain only letters + (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is + None then the table argument must contain the entire table reference + specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. + dataset_id: The ID of the dataset containing this table or null if the + table reference is specified entirely by the table argument. + project_id: The ID of the project containing this table or null if the + table reference is specified entirely by the table argument. + batch_size: Number of rows to be written to BQ per streaming API insert. + schema: The schema to be used if the BigQuery table to write has to be + created. This can be either specified as a 'bigquery.TableSchema' object + or a single string of the form 'field1:type1,field2:type2,field3:type3' + that defines a comma separated list of fields. Here 'type' should + specify the BigQuery type of the field. Single string based schemas do + not support nested fields, repeated fields, or specifying a BigQuery + mode for fields (mode will always be set to 'NULLABLE'). + create_disposition: A string describing what happens if the table does not + exist. Possible values are: + - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist. + - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist. + write_disposition: A string describing what happens if the table has + already some data. Possible values are: + - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows. + - BigQueryDisposition.WRITE_APPEND: add to existing rows. + - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty. + For streaming pipelines WriteTruncate can not be used. + test_client: Override the default bigquery client used for testing. + """ + self.table_id = table_id + self.dataset_id = dataset_id + self.project_id = project_id + self.schema = schema + self.client = client + self.create_disposition = create_disposition + self.write_disposition = write_disposition + self._rows_buffer = [] + # The default batch size is 500 + self._max_batch_size = batch_size or 500 + + @staticmethod + def get_table_schema(schema): + # Transform the table schema into a bigquery.TableSchema instance. + if isinstance(schema, basestring): + table_schema = bigquery.TableSchema() + schema_list = [s.strip() for s in schema.split(',')] + for field_and_type in schema_list: + field_name, field_type = field_and_type.split(':') + field_schema = bigquery.TableFieldSchema() + field_schema.name = field_name + field_schema.type = field_type + field_schema.mode = 'NULLABLE' + table_schema.fields.append(field_schema) + return table_schema + elif schema is None: + return schema + elif isinstance(schema, bigquery.TableSchema): + return schema + else: + raise TypeError('Unexpected schema argument: %s.' % schema) + + def start_bundle(self): + self._rows_buffer = [] + self.table_schema = self.get_table_schema(self.schema) + + self.bigquery_wrapper = BigQueryWrapper(client=self.client) + self.bigquery_wrapper.get_or_create_table( + self.project_id, self.dataset_id, self.table_id, self.table_schema, + self.create_disposition, self.write_disposition) + + def process(self, element, unused_create_fn_output=None): + self._rows_buffer.append(element) + if len(self._rows_buffer) >= self._max_batch_size: + self._flush_batch() + + def finish_bundle(self): + if self._rows_buffer: + self._flush_batch() + self._rows_buffer = [] + + def _flush_batch(self): + # Flush the current batch of rows to BigQuery. + passed, errors = self.bigquery_wrapper.insert_rows( + project_id=self.project_id, dataset_id=self.dataset_id, + table_id=self.table_id, rows=self._rows_buffer) + if not passed: + raise RuntimeError('Could not successfully insert rows to BigQuery' + ' table [%s:%s.%s]. Errors: %s'% + (self.project_id, self.dataset_id, + self.table_id, errors)) + logging.debug("Successfully wrote %d rows.", len(self._rows_buffer)) + self._rows_buffer = [] + + +class WriteToBigQuery(PTransform): + + def __init__(self, table, dataset=None, project=None, schema=None, + create_disposition=BigQueryDisposition.CREATE_IF_NEEDED, + write_disposition=BigQueryDisposition.WRITE_APPEND, + batch_size=None, test_client=None): + """Initialize a WriteToBigQuery transform. + + Args: + table: The ID of the table. The ID must contain only letters + (a-z, A-Z), numbers (0-9), or underscores (_). If dataset argument is + None then the table argument must contain the entire table reference + specified as: 'DATASET.TABLE' or 'PROJECT:DATASET.TABLE'. + dataset: The ID of the dataset containing this table or null if the table + reference is specified entirely by the table argument. + project: The ID of the project containing this table or null if the table + reference is specified entirely by the table argument. + schema: The schema to be used if the BigQuery table to write has to be + created. This can be either specified as a 'bigquery.TableSchema' object + or a single string of the form 'field1:type1,field2:type2,field3:type3' + that defines a comma separated list of fields. Here 'type' should + specify the BigQuery type of the field. Single string based schemas do + not support nested fields, repeated fields, or specifying a BigQuery + mode for fields (mode will always be set to 'NULLABLE'). + create_disposition: A string describing what happens if the table does not + exist. Possible values are: + - BigQueryDisposition.CREATE_IF_NEEDED: create if does not exist. + - BigQueryDisposition.CREATE_NEVER: fail the write if does not exist. + write_disposition: A string describing what happens if the table has + already some data. Possible values are: + - BigQueryDisposition.WRITE_TRUNCATE: delete existing rows. + - BigQueryDisposition.WRITE_APPEND: add to existing rows. + - BigQueryDisposition.WRITE_EMPTY: fail the write if table not empty. + For streaming pipelines WriteTruncate can not be used. + batch_size: Number of rows to be written to BQ per streaming API insert. + test_client: Override the default bigquery client used for testing. + """ + self.table_reference = _parse_table_reference(table, dataset, project) + self.create_disposition = BigQueryDisposition.validate_create( + create_disposition) + self.write_disposition = BigQueryDisposition.validate_write( + write_disposition) + self.schema = schema + self.batch_size = batch_size + self.test_client = test_client + + def expand(self, pcoll): + bigquery_write_fn = BigQueryWriteFn( + table_id=self.table_reference.tableId, + dataset_id=self.table_reference.datasetId, + project_id=self.table_reference.projectId, + batch_size=self.batch_size, + schema=self.schema, + create_disposition=self.create_disposition, + write_disposition=self.write_disposition, + client=self.test_client) + return pcoll | 'Write to BigQuery' >> ParDo(bigquery_write_fn) + + def display_data(self): + res = {} + if self.table_reference is not None: + tableSpec = '{}.{}'.format(self.table_reference.datasetId, + self.table_reference.tableId) + if self.table_reference.projectId is not None: + tableSpec = '{}:{}'.format(self.table_reference.projectId, + tableSpec) + res['table'] = DisplayDataItem(tableSpec, label='Table') + return res http://git-wip-us.apache.org/repos/asf/beam/blob/1498684d/sdks/python/apache_beam/io/gcp/bigquery_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py index a26050c..b7f766b 100644 --- a/sdks/python/apache_beam/io/gcp/bigquery_test.py +++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py @@ -824,6 +824,178 @@ class TestBigQueryWrapper(unittest.TestCase): self.assertEqual(new_dataset.datasetReference.datasetId, 'dataset_id') +@unittest.skipIf(HttpError is None, 'GCP dependencies are not installed') +class WriteToBigQuery(unittest.TestCase): + + def test_dofn_client_start_bundle_called(self): + client = mock.Mock() + client.tables.Get.return_value = bigquery.Table( + tableReference=bigquery.TableReference( + projectId='project_id', datasetId='dataset_id', tableId='table_id')) + create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER + write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + fn = beam.io.gcp.bigquery.BigQueryWriteFn( + table_id='table_id', + dataset_id='dataset_id', + project_id='project_id', + batch_size=2, + schema='month:INTEGER', + create_disposition=create_disposition, + write_disposition=write_disposition, + client=client) + + fn.start_bundle() + self.assertTrue(client.tables.Get.called) + + def test_dofn_client_start_bundle_create_called(self): + client = mock.Mock() + client.tables.Get.return_value = None + client.tables.Insert.return_value = bigquery.Table( + tableReference=bigquery.TableReference( + projectId='project_id', datasetId='dataset_id', tableId='table_id')) + create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER + write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + + fn = beam.io.gcp.bigquery.BigQueryWriteFn( + table_id='table_id', + dataset_id='dataset_id', + project_id='project_id', + batch_size=2, + schema='month:INTEGER', + create_disposition=create_disposition, + write_disposition=write_disposition, + client=client) + + fn.start_bundle() + self.assertTrue(client.tables.Get.called) + self.assertTrue(client.tables.Insert.called) + + def test_dofn_client_process_performs_batching(self): + client = mock.Mock() + client.tables.Get.return_value = bigquery.Table( + tableReference=bigquery.TableReference( + projectId='project_id', datasetId='dataset_id', tableId='table_id')) + client.tabledata.InsertAll.return_value = \ + bigquery.TableDataInsertAllResponse(insertErrors=[]) + create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER + write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + + fn = beam.io.gcp.bigquery.BigQueryWriteFn( + table_id='table_id', + dataset_id='dataset_id', + project_id='project_id', + batch_size=2, + schema='month:INTEGER', + create_disposition=create_disposition, + write_disposition=write_disposition, + client=client) + + fn.start_bundle() + fn.process({'month': 1}) + + self.assertTrue(client.tables.Get.called) + # InsertRows not called as batch size is not hit yet + self.assertFalse(client.tabledata.InsertAll.called) + + def test_dofn_client_process_flush_called(self): + client = mock.Mock() + client.tables.Get.return_value = bigquery.Table( + tableReference=bigquery.TableReference( + projectId='project_id', datasetId='dataset_id', tableId='table_id')) + client.tabledata.InsertAll.return_value = ( + bigquery.TableDataInsertAllResponse(insertErrors=[])) + create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER + write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + + fn = beam.io.gcp.bigquery.BigQueryWriteFn( + table_id='table_id', + dataset_id='dataset_id', + project_id='project_id', + batch_size=2, + schema='month:INTEGER', + create_disposition=create_disposition, + write_disposition=write_disposition, + client=client) + + fn.start_bundle() + fn.process({'month': 1}) + fn.process({'month': 2}) + self.assertTrue(client.tables.Get.called) + # InsertRows called as batch size is hit + self.assertTrue(client.tabledata.InsertAll.called) + + def test_dofn_client_finish_bundle_flush_called(self): + client = mock.Mock() + client.tables.Get.return_value = bigquery.Table( + tableReference=bigquery.TableReference( + projectId='project_id', datasetId='dataset_id', tableId='table_id')) + client.tabledata.InsertAll.return_value = \ + bigquery.TableDataInsertAllResponse(insertErrors=[]) + create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER + write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + + fn = beam.io.gcp.bigquery.BigQueryWriteFn( + table_id='table_id', + dataset_id='dataset_id', + project_id='project_id', + batch_size=2, + schema='month:INTEGER', + create_disposition=create_disposition, + write_disposition=write_disposition, + client=client) + + fn.start_bundle() + fn.process({'month': 1}) + + self.assertTrue(client.tables.Get.called) + # InsertRows not called as batch size is not hit + self.assertFalse(client.tabledata.InsertAll.called) + + fn.finish_bundle() + # InsertRows called in finish bundle + self.assertTrue(client.tabledata.InsertAll.called) + + def test_dofn_client_no_records(self): + client = mock.Mock() + client.tables.Get.return_value = bigquery.Table( + tableReference=bigquery.TableReference( + projectId='project_id', datasetId='dataset_id', tableId='table_id')) + client.tabledata.InsertAll.return_value = \ + bigquery.TableDataInsertAllResponse(insertErrors=[]) + create_disposition = beam.io.BigQueryDisposition.CREATE_NEVER + write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND + + fn = beam.io.gcp.bigquery.BigQueryWriteFn( + table_id='table_id', + dataset_id='dataset_id', + project_id='project_id', + batch_size=2, + schema='month:INTEGER', + create_disposition=create_disposition, + write_disposition=write_disposition, + client=client) + + fn.start_bundle() + self.assertTrue(client.tables.Get.called) + # InsertRows not called as batch size is not hit + self.assertFalse(client.tabledata.InsertAll.called) + + fn.finish_bundle() + # InsertRows not called in finish bundle as no records + self.assertFalse(client.tabledata.InsertAll.called) + + def test_simple_schema_parsing(self): + table_schema = beam.io.gcp.bigquery.BigQueryWriteFn.get_table_schema( + schema='s:STRING, n:INTEGER') + string_field = bigquery.TableFieldSchema( + name='s', type='STRING', mode='NULLABLE') + number_field = bigquery.TableFieldSchema( + name='n', type='INTEGER', mode='NULLABLE') + expected_table_schema = bigquery.TableSchema( + fields=[string_field, number_field]) + self.assertEqual(expected_table_schema, table_schema) + + if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) unittest.main()