beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [beam] branch master updated: [BEAM-12590] Automatically upgrading Dataflow Python pipelines that use cross-language transforms to Runner v2 (#15079)
Date Tue, 13 Jul 2021 01:40:27 GMT
This is an automated email from the ASF dual-hosted git repository.

chamikara pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 05699c3  [BEAM-12590] Automatically upgrading Dataflow Python pipelines that use
cross-language transforms to Runner v2 (#15079)
05699c3 is described below

commit 05699c3410dfce3c62013140a57c266a2839bccb
Author: Chamikara Jayalath <chamikara@apache.org>
AuthorDate: Mon Jul 12 18:39:41 2021 -0700

    [BEAM-12590] Automatically upgrading Dataflow Python pipelines that use cross-language
transforms to Runner v2 (#15079)
    
    * Automatically upgrading Dataflow Python pipelines that use cross-language transforms
to Runner v2.
    
    * Revert change to a test
---
 .../apache_beam/runners/dataflow/dataflow_runner.py    | 18 ++++++++++++------
 1 file changed, 12 insertions(+), 6 deletions(-)

diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 2b8aa19..c112bdd 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -408,6 +408,18 @@ class DataflowRunner(PipelineRunner):
           'Google Cloud Dataflow runner not available, '
           'please install apache_beam[gcp]')
 
+    debug_options = options.view_as(DebugOptions)
+    if pipeline.contains_external_transforms:
+      if not apiclient._use_unified_worker(options):
+        _LOGGER.info(
+            'Automatically enabling Dataflow Runner v2 since the '
+            'pipeline used cross-language transforms.')
+        # This has to be done before any Fn API specific setup.
+        debug_options.add_experiment("use_runner_v2")
+      # Dataflow multi-language pipelines require portable job submission.
+      if not debug_options.lookup_experiment('use_portable_job_submission'):
+        debug_options.add_experiment("use_portable_job_submission")
+
     self._maybe_add_unified_worker_missing_options(options)
 
     use_fnapi = apiclient._use_fnapi(options)
@@ -511,12 +523,6 @@ class DataflowRunner(PipelineRunner):
       debug_options.add_experiment(
           'min_cpu_platform=' + worker_options.min_cpu_platform)
 
-    if (apiclient._use_unified_worker(options) and
-        pipeline.contains_external_transforms):
-      # All Dataflow multi-language pipelines (supported by Runner v2 only) use
-      # portable job submission by default.
-      debug_options.add_experiment("use_portable_job_submission")
-
     # Elevate "enable_streaming_engine" to pipeline option, but using the
     # existing experiment.
     google_cloud_options = options.view_as(GoogleCloudOptions)

Mime
View raw message