beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [34/50] beam git commit: [BEAM-2405] Override to sink interface in the batch dataflow BQ
Date Thu, 08 Jun 2017 01:35:23 GMT
[BEAM-2405] Override to sink interface in the batch dataflow BQ


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/e641997a
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/e641997a
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/e641997a

Branch: refs/heads/DSL_SQL
Commit: e641997affc378ec0337d5ac19d8677cba0d0933
Parents: b6347d0
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Tue Jun 6 19:49:54 2017 -0700
Committer: chamikara@google.com <chamikara@google.com>
Committed: Tue Jun 6 22:17:05 2017 -0700

----------------------------------------------------------------------
 .../examples/cookbook/bigquery_tornadoes.py       | 11 +++++------
 sdks/python/apache_beam/io/gcp/bigquery.py        |  2 +-
 .../runners/dataflow/dataflow_runner.py           | 18 ++++++++++++++++++
 3 files changed, 24 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
index d3b216e..1ca49c5 100644
--- a/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
+++ b/sdks/python/apache_beam/examples/cookbook/bigquery_tornadoes.py
@@ -83,12 +83,11 @@ def run(argv=None):
 
     # Write the output using a "Write" transform that has side effects.
     # pylint: disable=expression-not-assigned
-    counts | 'write' >> beam.io.Write(
-        beam.io.BigQuerySink(
-            known_args.output,
-            schema='month:INTEGER, tornado_count:INTEGER',
-            create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
-            write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE))
+    counts | 'Write' >> beam.io.WriteToBigQuery(
+        known_args.output,
+        schema='month:INTEGER, tornado_count:INTEGER',
+        create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
+        write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)
 
     # Run the pipeline (all operations are deferred until run() is called).
 

http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/io/gcp/bigquery.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/bigquery.py b/sdks/python/apache_beam/io/gcp/bigquery.py
index 9069f73..da8be68 100644
--- a/sdks/python/apache_beam/io/gcp/bigquery.py
+++ b/sdks/python/apache_beam/io/gcp/bigquery.py
@@ -1299,7 +1299,7 @@ class WriteToBigQuery(PTransform):
         create_disposition=self.create_disposition,
         write_disposition=self.write_disposition,
         client=self.test_client)
-    return pcoll | 'Write to BigQuery' >> ParDo(bigquery_write_fn)
+    return pcoll | 'WriteToBigQuery' >> ParDo(bigquery_write_fn)
 
   def display_data(self):
     res = {}

http://git-wip-us.apache.org/repos/asf/beam/blob/e641997a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 62cea33..3fc8983 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -27,6 +27,7 @@ import time
 import traceback
 import urllib
 
+import apache_beam as beam
 from apache_beam import error
 from apache_beam import coders
 from apache_beam import pvalue
@@ -378,6 +379,23 @@ class DataflowRunner(PipelineRunner):
           PropertyNames.ENCODING: step.encoding,
           PropertyNames.OUTPUT_NAME: PropertyNames.OUT}])
 
+  def apply_WriteToBigQuery(self, transform, pcoll):
+    standard_options = pcoll.pipeline._options.view_as(StandardOptions)
+    if standard_options.streaming:
+      if (transform.write_disposition ==
+          beam.io.BigQueryDisposition.WRITE_TRUNCATE):
+        raise RuntimeError('Can not use write truncation mode in streaming')
+      return self.apply_PTransform(transform, pcoll)
+    else:
+      return pcoll  | 'WriteToBigQuery' >> beam.io.Write(
+          beam.io.BigQuerySink(
+              transform.table_reference.tableId,
+              transform.table_reference.datasetId,
+              transform.table_reference.projectId,
+              transform.schema,
+              transform.create_disposition,
+              transform.write_disposition))
+
   def apply_GroupByKey(self, transform, pcoll):
     # Infer coder of parent.
     #


Mime
View raw message