beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: We shouldn't write to re-created tables for 2 mins
Date Wed, 26 Jul 2017 05:59:14 GMT
Repository: beam
Updated Branches:
  refs/heads/master d919394c7 -> a9fdc3bc4


We shouldn't write to re-created tables for 2 mins


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/483abc09
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/483abc09
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/483abc09

Branch: refs/heads/master
Commit: 483abc0941f0fb42c506565f6912153296fd94b5
Parents: d919394
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Mon Jul 24 15:54:02 2017 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Tue Jul 25 22:58:23 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcp/bigquery.py      | 19 +++++++++++++++----
 sdks/python/apache_beam/io/gcp/bigquery_test.py |  3 ++-
 2 files changed, 17 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/483abc09/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 23fd310..db6715a 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1002,12 +1002,23 @@ class BigQueryWrapper(object):
     if found_table and write_disposition != BigQueryDisposition.WRITE_TRUNCATE:
       return found_table
     else:
+      created_table = self._create_table(project_id=project_id,
+                                         dataset_id=dataset_id,
+                                         table_id=table_id,
+                                         schema=schema or found_table.schema)
       # if write_disposition == BigQueryDisposition.WRITE_TRUNCATE we delete
       # the table before this point.
-      return self._create_table(project_id=project_id,
-                                dataset_id=dataset_id,
-                                table_id=table_id,
-                                schema=schema or found_table.schema)
+      if write_disposition == BigQueryDisposition.WRITE_TRUNCATE:
+        # BigQuery can route data to the old table for 2 mins max so wait
+        # that much time before creating the table and writing it
+        logging.warning('Sleeping for 150 seconds before the write as ' +
+                        'BigQuery inserts can be routed to deleted table ' +
+                        'for 2 mins after the delete and create.')
+        # TODO(BEAM-2673): Remove this sleep by migrating to load api
+        time.sleep(150)
+        return created_table
+      else:
+        return created_table
 
   def run_query(self, project_id, query, use_legacy_sql, flatten_results,
                 dry_run=False):

http://git-wip-us.apache.org/repos/asf/beam/blob/483abc09/sdks/python/apache_beam/io/gcp/bigquery_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery_test.py b/sdks/python/apache_beam/io/gcp/bigquery_test.py
index 14247ba..bfd06ac 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery_test.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery_test.py
@@ -650,7 +650,8 @@ class TestBigQueryWriter(unittest.TestCase):
     self.assertFalse(client.tables.Delete.called)
     self.assertFalse(client.tables.Insert.called)
 
-  def test_table_with_write_disposition_truncate(self):
+  @mock.patch('time.sleep', return_value=None)
+  def test_table_with_write_disposition_truncate(self, _patched_sleep):
     client = mock.Mock()
     table = bigquery.Table(
         tableReference=bigquery.TableReference(


Mime
View raw message