beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: Adding a warning to use multi-workers on FnApiRunner
Date Mon, 01 Mar 2021 22:03:51 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 21a11fd  Adding a warning to use multi-workers on FnApiRunner
     new 0aa07a5  Merge pull request #14108 from pabloem/fnwarning
21a11fd is described below

commit 21a11fd7198fefc6122f54af2a98b338db222071
Author: Pablo Estrada <pabloem@apache.org>
AuthorDate: Fri Feb 26 14:35:53 2021 -0800

    Adding a warning to use multi-workers on FnApiRunner
---
 sdks/python/apache_beam/runners/common.py          | 39 ++++++++++++++++++++++
 .../runners/dataflow/dataflow_runner.py            | 38 ++-------------------
 .../runners/dataflow/dataflow_runner_test.py       |  9 ++---
 .../runners/portability/fn_api_runner/fn_runner.py | 14 ++++++--
 4 files changed, 57 insertions(+), 43 deletions(-)

diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py
index 745cbc1..a929e6b 100644
--- a/sdks/python/apache_beam/runners/common.py
+++ b/sdks/python/apache_beam/runners/common.py
@@ -62,6 +62,7 @@ from apache_beam.transforms.core import WatermarkEstimatorProvider
 from apache_beam.transforms.window import GlobalWindow
 from apache_beam.transforms.window import TimestampedValue
 from apache_beam.transforms.window import WindowFn
+from apache_beam.typehints import typehints
 from apache_beam.utils.counters import Counter
 from apache_beam.utils.counters import CounterName
 from apache_beam.utils.timestamp import Timestamp
@@ -1511,3 +1512,41 @@ class DoFnContext(object):
       raise AttributeError('windows not accessible in this context')
     else:
       return self.windowed_value.windows
+
+
+def group_by_key_input_visitor(deterministic_key_coders=True):
+  # Importing here to avoid a circular dependency
+  from apache_beam.pipeline import PipelineVisitor
+  from apache_beam.transforms.core import GroupByKey
+
+  class GroupByKeyInputVisitor(PipelineVisitor):
+    """A visitor that replaces `Any` element type for input `PCollection` of
+    a `GroupByKey` with a `KV` type.
+
+    TODO(BEAM-115): Once Python SDK is compatible with the new Runner API,
+    we could directly replace the coder instead of mutating the element type.
+    """
+    def __init__(self, deterministic_key_coders=True):
+      self.deterministic_key_coders = deterministic_key_coders
+
+    def enter_composite_transform(self, transform_node):
+      self.visit_transform(transform_node)
+
+    def visit_transform(self, transform_node):
+      # Imported here to avoid circular dependencies.
+      # pylint: disable=wrong-import-order, wrong-import-position
+      if isinstance(transform_node.transform, GroupByKey):
+        pcoll = transform_node.inputs[0]
+        pcoll.element_type = typehints.coerce_to_kv_type(
+            pcoll.element_type, transform_node.full_label)
+        pcoll.requires_deterministic_key_coder = (
+            self.deterministic_key_coders and transform_node.full_label)
+        key_type, value_type = pcoll.element_type.tuple_types
+        if transform_node.outputs:
+          key = next(iter(transform_node.outputs.keys()))
+          transform_node.outputs[key].element_type = typehints.KV[
+              key_type, typehints.Iterable[value_type]]
+          transform_node.outputs[key].requires_deterministic_key_coder = (
+              self.deterministic_key_coders and transform_node.full_label)
+
+  return GroupByKeyInputVisitor(deterministic_key_coders)
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
index 892284f..e8b2e19 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py
@@ -57,6 +57,7 @@ from apache_beam.portability import common_urns
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.pvalue import AsSideInput
 from apache_beam.runners.common import DoFnSignature
+from apache_beam.runners.common import group_by_key_input_visitor
 from apache_beam.runners.dataflow.internal import names
 from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api
 from apache_beam.runners.dataflow.internal.names import PropertyNames
@@ -271,41 +272,6 @@ class DataflowRunner(PipelineRunner):
     return element
 
   @staticmethod
-  def group_by_key_input_visitor(deterministic_key_coders=True):
-    # Imported here to avoid circular dependencies.
-    from apache_beam.pipeline import PipelineVisitor
-
-    class GroupByKeyInputVisitor(PipelineVisitor):
-      """A visitor that replaces `Any` element type for input `PCollection` of
-      a `GroupByKey` with a `KV` type.
-
-      TODO(BEAM-115): Once Python SDk is compatible with the new Runner API,
-      we could directly replace the coder instead of mutating the element type.
-      """
-      def enter_composite_transform(self, transform_node):
-        self.visit_transform(transform_node)
-
-      def visit_transform(self, transform_node):
-        # Imported here to avoid circular dependencies.
-        # pylint: disable=wrong-import-order, wrong-import-position
-        from apache_beam.transforms.core import GroupByKey
-        if isinstance(transform_node.transform, GroupByKey):
-          pcoll = transform_node.inputs[0]
-          pcoll.element_type = typehints.coerce_to_kv_type(
-              pcoll.element_type, transform_node.full_label)
-          pcoll.requires_deterministic_key_coder = (
-              deterministic_key_coders and transform_node.full_label)
-          key_type, value_type = pcoll.element_type.tuple_types
-          if transform_node.outputs:
-            key = DataflowRunner._only_element(transform_node.outputs.keys())
-            transform_node.outputs[key].element_type = typehints.KV[
-                key_type, typehints.Iterable[value_type]]
-            transform_node.outputs[key].requires_deterministic_key_coder = (
-                deterministic_key_coders and transform_node.full_label)
-
-    return GroupByKeyInputVisitor()
-
-  @staticmethod
   def side_input_visitor(
       use_unified_worker=False,
       use_fn_api=False,
@@ -445,7 +411,7 @@ class DataflowRunner(PipelineRunner):
     # Dataflow runner requires a KV type for GBK inputs, hence we enforce that
     # here.
     pipeline.visit(
-        self.group_by_key_input_visitor(
+        group_by_key_input_visitor(
             not pipeline._options.view_as(
                 TypeOptions).allow_non_deterministic_key_coders))
 
diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
index 9074d97..01d1919 100644
--- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
+++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py
@@ -45,6 +45,7 @@ from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.pvalue import PCollection
 from apache_beam.runners import DataflowRunner
 from apache_beam.runners import TestDataflowRunner
+from apache_beam.runners import common
 from apache_beam.runners import create_runner
 from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult
 from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException
@@ -334,7 +335,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
     for pcoll in [pcoll1, pcoll2, pcoll3]:
       applied = AppliedPTransform(None, beam.GroupByKey(), "label", [pcoll])
       applied.outputs[None] = PCollection(None)
-      DataflowRunner.group_by_key_input_visitor().visit_transform(applied)
+      common.group_by_key_input_visitor().visit_transform(applied)
       self.assertEqual(
           pcoll.element_type, typehints.KV[typehints.Any, typehints.Any])
 
@@ -350,7 +351,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
         "Found .*")
     for pcoll in [pcoll1, pcoll2]:
       with self.assertRaisesRegex(ValueError, err_msg):
-        DataflowRunner.group_by_key_input_visitor().visit_transform(
+        common.group_by_key_input_visitor().visit_transform(
             AppliedPTransform(None, beam.GroupByKey(), "label", [pcoll]))
 
   def test_group_by_key_input_visitor_for_non_gbk_transforms(self):
@@ -358,7 +359,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
     pcoll = PCollection(p)
     for transform in [beam.Flatten(), beam.Map(lambda x: x)]:
       pcoll.element_type = typehints.Any
-      DataflowRunner.group_by_key_input_visitor().visit_transform(
+      common.group_by_key_input_visitor().visit_transform(
           AppliedPTransform(None, transform, "label", [pcoll]))
       self.assertEqual(pcoll.element_type, typehints.Any)
 
@@ -398,7 +399,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin):
     # to make sure the check below is not vacuous.
     self.assertNotIsInstance(flat.element_type, typehints.TupleConstraint)
 
-    p.visit(DataflowRunner.group_by_key_input_visitor())
+    p.visit(common.group_by_key_input_visitor())
     p.visit(DataflowRunner.flatten_input_visitor())
 
     # The dataflow runner requires gbk input to be tuples *and* flatten
diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
index 4b47b5e..9850cdb 100644
--- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
+++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py
@@ -57,6 +57,7 @@ from apache_beam.portability.api import beam_fn_api_pb2
 from apache_beam.portability.api import beam_provision_api_pb2
 from apache_beam.portability.api import beam_runner_api_pb2
 from apache_beam.runners import runner
+from apache_beam.runners.common import group_by_key_input_visitor
 from apache_beam.runners.portability import portable_metrics
 from apache_beam.runners.portability.fn_api_runner import execution
 from apache_beam.runners.portability.fn_api_runner import translations
@@ -153,10 +154,8 @@ class FnApiRunner(runner.PipelineRunner):
     # This is sometimes needed if type checking is disabled
     # to enforce that the inputs (and outputs) of GroupByKey operations
     # are known to be KVs.
-    from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner
-    # TODO: Move group_by_key_input_visitor() to a non-dataflow specific file.
     pipeline.visit(
-        DataflowRunner.group_by_key_input_visitor(
+        group_by_key_input_visitor(
             not options.view_as(pipeline_options.TypeOptions).
             allow_non_deterministic_key_coders))
     self._bundle_repeat = self._bundle_repeat or options.view_as(
@@ -179,6 +178,15 @@ class FnApiRunner(runner.PipelineRunner):
       self._default_environment = environments.SubprocessSDKEnvironment(
           command_string=command_string)
 
+    if running_mode == 'in_memory' and self._num_workers != 1:
+      _LOGGER.warning(
+          'If direct_num_workers is not equal to 1, direct_running_mode '
+          'should be `multi_processing` or `multi_threading` instead of '
+          '`in_memory` in order for it to have the desired worker parallelism '
+          'effect. direct_num_workers: %d ; running_mode: %s',
+          self._num_workers,
+          running_mode)
+
     self._profiler_factory = Profile.factory_from_options(
         options.view_as(pipeline_options.ProfilingOptions))
 


Mime
View raw message