beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From taki...@apache.org
Subject [21/50] [abbrv] beam git commit: Set the type of batch jobs to FNAPI_BATCH when beam_fn_api experiment is specified.
Date Thu, 13 Jul 2017 03:06:33 GMT
Set the type of batch jobs to FNAPI_BATCH when beam_fn_api experiment is specified.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/b1313ffe
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/b1313ffe
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/b1313ffe

Branch: refs/heads/DSL_SQL
Commit: b1313ffef5bf8a2dd17ee20b6dd77f62d4174659
Parents: 78a39bd
Author: Valentyn Tymofieiev <valentyn@google.com>
Authored: Fri Jul 7 15:14:56 2017 -0700
Committer: Tyler Akidau <takidau@apache.org>
Committed: Wed Jul 12 20:01:00 2017 -0700

----------------------------------------------------------------------
 .../runners/dataflow/dataflow_runner.py         | 16 ++---------
 .../runners/dataflow/internal/apiclient.py      | 29 ++++++++++++++++++--
 .../runners/dataflow/internal/apiclient_test.py |  5 +---
 3 files changed, 29 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/b1313ffe/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 57bcc5e..059e139 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -46,8 +46,8 @@ from apache_beam.runners.runner import PipelineRunner
 from apache_beam.runners.runner import PipelineState
 from apache_beam.transforms.display import DisplayData
 from apache_beam.typehints import typehints
-from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import SetupOptions
+from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import TestOptions
 from apache_beam.utils.plugin import BeamPlugin
 
@@ -65,12 +65,6 @@ class DataflowRunner(PipelineRunner):
   if blocking is set to False.
   """
 
-  # Environment version information. It is passed to the service during a
-  # a job submission and is used by the service to establish what features
-  # are expected by the workers.
-  BATCH_ENVIRONMENT_MAJOR_VERSION = '6'
-  STREAMING_ENVIRONMENT_MAJOR_VERSION = '1'
-
   # A list of PTransformOverride objects to be applied before running a pipeline
   # using DataflowRunner.
   # Currently this only works for overrides where the input and output types do
@@ -268,15 +262,9 @@ class DataflowRunner(PipelineRunner):
     if test_options.dry_run:
       return None
 
-    standard_options = pipeline._options.view_as(StandardOptions)
-    if standard_options.streaming:
-      job_version = DataflowRunner.STREAMING_ENVIRONMENT_MAJOR_VERSION
-    else:
-      job_version = DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION
-
     # Get a Dataflow API client and set its options
     self.dataflow_client = apiclient.DataflowApplicationClient(
-        pipeline._options, job_version)
+        pipeline._options)
 
     # Create the job
     result = DataflowPipelineResult(

http://git-wip-us.apache.org/repos/asf/beam/blob/b1313ffe/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
index edac9d7..33dfe19 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient.py
@@ -49,6 +49,13 @@ from apache_beam.options.pipeline_options import StandardOptions
 from apache_beam.options.pipeline_options import WorkerOptions
 
 
+# Environment version information. It is passed to the service during a
+# a job submission and is used by the service to establish what features
+# are expected by the workers.
+_LEGACY_ENVIRONMENT_MAJOR_VERSION = '6'
+_FNAPI_ENVIRONMENT_MAJOR_VERSION = '1'
+
+
 class Step(object):
   """Wrapper for a dataflow Step protobuf."""
 
@@ -146,7 +153,10 @@ class Environment(object):
     if self.standard_options.streaming:
       job_type = 'FNAPI_STREAMING'
     else:
-      job_type = 'PYTHON_BATCH'
+      if _use_fnapi(options):
+        job_type = 'FNAPI_BATCH'
+      else:
+        job_type = 'PYTHON_BATCH'
     self.proto.version.additionalProperties.extend([
         dataflow.Environment.VersionValue.AdditionalProperty(
             key='job_type',
@@ -360,11 +370,16 @@ class Job(object):
 class DataflowApplicationClient(object):
   """A Dataflow API client used by application code to create and query jobs."""
 
-  def __init__(self, options, environment_version):
+  def __init__(self, options):
     """Initializes a Dataflow API client object."""
     self.standard_options = options.view_as(StandardOptions)
     self.google_cloud_options = options.view_as(GoogleCloudOptions)
-    self.environment_version = environment_version
+
+    if _use_fnapi(options):
+      self.environment_version = _FNAPI_ENVIRONMENT_MAJOR_VERSION
+    else:
+      self.environment_version = _LEGACY_ENVIRONMENT_MAJOR_VERSION
+
     if self.google_cloud_options.no_auth:
       credentials = None
     else:
@@ -706,6 +721,14 @@ def translate_mean(accumulator, metric_update):
     metric_update.kind = None
 
 
+def _use_fnapi(pipeline_options):
+  standard_options = pipeline_options.view_as(StandardOptions)
+  debug_options = pipeline_options.view_as(DebugOptions)
+
+  return standard_options.streaming or (
+      debug_options.experiments and 'beam_fn_api' in debug_options.experiments)
+
+
 # To enable a counter on the service, add it to this dictionary.
 metric_translations = {
     cy_combiners.CountCombineFn: ('sum', translate_scalar),

http://git-wip-us.apache.org/repos/asf/beam/blob/b1313ffe/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
index 55211f7..407ffcf 100644
--- a/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/internal/apiclient_test.py
@@ -22,7 +22,6 @@ from mock import Mock
 from apache_beam.metrics.cells import DistributionData
 from apache_beam.options.pipeline_options import PipelineOptions
 
-from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
 from apache_beam.runners.dataflow.internal.clients import dataflow
 
 # Protect against environments where apitools library is not available.
@@ -40,9 +39,7 @@ class UtilTest(unittest.TestCase):
   @unittest.skip("Enable once BEAM-1080 is fixed.")
   def test_create_application_client(self):
     pipeline_options = PipelineOptions()
-    apiclient.DataflowApplicationClient(
-        pipeline_options,
-        DataflowRunner.BATCH_ENVIRONMENT_MAJOR_VERSION)
+    apiclient.DataflowApplicationClient(pipeline_options)
 
   def test_set_network(self):
     pipeline_options = PipelineOptions(


Mime
View raw message