From commits-return-113179-archive-asf-public=cust-asf.ponee.io@beam.apache.org Mon Mar 1 22:03:57 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 1FC7318062C for ; Mon, 1 Mar 2021 23:03:57 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 79E6C64383 for ; Mon, 1 Mar 2021 22:03:56 +0000 (UTC) Received: (qmail 21794 invoked by uid 500); 1 Mar 2021 22:03:55 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 21784 invoked by uid 99); 1 Mar 2021 22:03:55 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 01 Mar 2021 22:03:55 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 39207816A5; Mon, 1 Mar 2021 22:03:55 +0000 (UTC) Date: Mon, 01 Mar 2021 22:03:51 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: Adding a warning to use multi-workers on FnApiRunner MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <161463622713.2478.10284684423321769564@gitbox.apache.org> From: pabloem@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 3dba1d3c3920fb918c0bf85cfd4b7eae4ac9dee7 X-Git-Newrev: 21a11fd7198fefc6122f54af2a98b338db222071 X-Git-Rev: 21a11fd7198fefc6122f54af2a98b338db222071 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 21a11fd Adding a warning to use multi-workers on FnApiRunner new 0aa07a5 Merge pull request #14108 from pabloem/fnwarning 21a11fd is described below commit 21a11fd7198fefc6122f54af2a98b338db222071 Author: Pablo Estrada AuthorDate: Fri Feb 26 14:35:53 2021 -0800 Adding a warning to use multi-workers on FnApiRunner --- sdks/python/apache_beam/runners/common.py | 39 ++++++++++++++++++++++ .../runners/dataflow/dataflow_runner.py | 38 ++------------------- .../runners/dataflow/dataflow_runner_test.py | 9 ++--- .../runners/portability/fn_api_runner/fn_runner.py | 14 ++++++-- 4 files changed, 57 insertions(+), 43 deletions(-) diff --git a/sdks/python/apache_beam/runners/common.py b/sdks/python/apache_beam/runners/common.py index 745cbc1..a929e6b 100644 --- a/sdks/python/apache_beam/runners/common.py +++ b/sdks/python/apache_beam/runners/common.py @@ -62,6 +62,7 @@ from apache_beam.transforms.core import WatermarkEstimatorProvider from apache_beam.transforms.window import GlobalWindow from apache_beam.transforms.window import TimestampedValue from apache_beam.transforms.window import WindowFn +from apache_beam.typehints import typehints from apache_beam.utils.counters import Counter from apache_beam.utils.counters import CounterName from apache_beam.utils.timestamp import Timestamp @@ -1511,3 +1512,41 @@ class DoFnContext(object): raise AttributeError('windows not accessible in this context') else: return self.windowed_value.windows + + +def group_by_key_input_visitor(deterministic_key_coders=True): + # Importing here to avoid a circular dependency + from apache_beam.pipeline import PipelineVisitor + from apache_beam.transforms.core import GroupByKey + + class GroupByKeyInputVisitor(PipelineVisitor): + """A visitor that replaces `Any` element type for input `PCollection` of + a `GroupByKey` with a `KV` type. + + TODO(BEAM-115): Once Python SDK is compatible with the new Runner API, + we could directly replace the coder instead of mutating the element type. + """ + def __init__(self, deterministic_key_coders=True): + self.deterministic_key_coders = deterministic_key_coders + + def enter_composite_transform(self, transform_node): + self.visit_transform(transform_node) + + def visit_transform(self, transform_node): + # Imported here to avoid circular dependencies. + # pylint: disable=wrong-import-order, wrong-import-position + if isinstance(transform_node.transform, GroupByKey): + pcoll = transform_node.inputs[0] + pcoll.element_type = typehints.coerce_to_kv_type( + pcoll.element_type, transform_node.full_label) + pcoll.requires_deterministic_key_coder = ( + self.deterministic_key_coders and transform_node.full_label) + key_type, value_type = pcoll.element_type.tuple_types + if transform_node.outputs: + key = next(iter(transform_node.outputs.keys())) + transform_node.outputs[key].element_type = typehints.KV[ + key_type, typehints.Iterable[value_type]] + transform_node.outputs[key].requires_deterministic_key_coder = ( + self.deterministic_key_coders and transform_node.full_label) + + return GroupByKeyInputVisitor(deterministic_key_coders) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index 892284f..e8b2e19 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -57,6 +57,7 @@ from apache_beam.portability import common_urns from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.pvalue import AsSideInput from apache_beam.runners.common import DoFnSignature +from apache_beam.runners.common import group_by_key_input_visitor from apache_beam.runners.dataflow.internal import names from apache_beam.runners.dataflow.internal.clients import dataflow as dataflow_api from apache_beam.runners.dataflow.internal.names import PropertyNames @@ -271,41 +272,6 @@ class DataflowRunner(PipelineRunner): return element @staticmethod - def group_by_key_input_visitor(deterministic_key_coders=True): - # Imported here to avoid circular dependencies. - from apache_beam.pipeline import PipelineVisitor - - class GroupByKeyInputVisitor(PipelineVisitor): - """A visitor that replaces `Any` element type for input `PCollection` of - a `GroupByKey` with a `KV` type. - - TODO(BEAM-115): Once Python SDk is compatible with the new Runner API, - we could directly replace the coder instead of mutating the element type. - """ - def enter_composite_transform(self, transform_node): - self.visit_transform(transform_node) - - def visit_transform(self, transform_node): - # Imported here to avoid circular dependencies. - # pylint: disable=wrong-import-order, wrong-import-position - from apache_beam.transforms.core import GroupByKey - if isinstance(transform_node.transform, GroupByKey): - pcoll = transform_node.inputs[0] - pcoll.element_type = typehints.coerce_to_kv_type( - pcoll.element_type, transform_node.full_label) - pcoll.requires_deterministic_key_coder = ( - deterministic_key_coders and transform_node.full_label) - key_type, value_type = pcoll.element_type.tuple_types - if transform_node.outputs: - key = DataflowRunner._only_element(transform_node.outputs.keys()) - transform_node.outputs[key].element_type = typehints.KV[ - key_type, typehints.Iterable[value_type]] - transform_node.outputs[key].requires_deterministic_key_coder = ( - deterministic_key_coders and transform_node.full_label) - - return GroupByKeyInputVisitor() - - @staticmethod def side_input_visitor( use_unified_worker=False, use_fn_api=False, @@ -445,7 +411,7 @@ class DataflowRunner(PipelineRunner): # Dataflow runner requires a KV type for GBK inputs, hence we enforce that # here. pipeline.visit( - self.group_by_key_input_visitor( + group_by_key_input_visitor( not pipeline._options.view_as( TypeOptions).allow_non_deterministic_key_coders)) diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py index 9074d97..01d1919 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner_test.py @@ -45,6 +45,7 @@ from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.pvalue import PCollection from apache_beam.runners import DataflowRunner from apache_beam.runners import TestDataflowRunner +from apache_beam.runners import common from apache_beam.runners import create_runner from apache_beam.runners.dataflow.dataflow_runner import DataflowPipelineResult from apache_beam.runners.dataflow.dataflow_runner import DataflowRuntimeException @@ -334,7 +335,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin): for pcoll in [pcoll1, pcoll2, pcoll3]: applied = AppliedPTransform(None, beam.GroupByKey(), "label", [pcoll]) applied.outputs[None] = PCollection(None) - DataflowRunner.group_by_key_input_visitor().visit_transform(applied) + common.group_by_key_input_visitor().visit_transform(applied) self.assertEqual( pcoll.element_type, typehints.KV[typehints.Any, typehints.Any]) @@ -350,7 +351,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin): "Found .*") for pcoll in [pcoll1, pcoll2]: with self.assertRaisesRegex(ValueError, err_msg): - DataflowRunner.group_by_key_input_visitor().visit_transform( + common.group_by_key_input_visitor().visit_transform( AppliedPTransform(None, beam.GroupByKey(), "label", [pcoll])) def test_group_by_key_input_visitor_for_non_gbk_transforms(self): @@ -358,7 +359,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin): pcoll = PCollection(p) for transform in [beam.Flatten(), beam.Map(lambda x: x)]: pcoll.element_type = typehints.Any - DataflowRunner.group_by_key_input_visitor().visit_transform( + common.group_by_key_input_visitor().visit_transform( AppliedPTransform(None, transform, "label", [pcoll])) self.assertEqual(pcoll.element_type, typehints.Any) @@ -398,7 +399,7 @@ class DataflowRunnerTest(unittest.TestCase, ExtraAssertionsMixin): # to make sure the check below is not vacuous. self.assertNotIsInstance(flat.element_type, typehints.TupleConstraint) - p.visit(DataflowRunner.group_by_key_input_visitor()) + p.visit(common.group_by_key_input_visitor()) p.visit(DataflowRunner.flatten_input_visitor()) # The dataflow runner requires gbk input to be tuples *and* flatten diff --git a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py index 4b47b5e..9850cdb 100644 --- a/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py +++ b/sdks/python/apache_beam/runners/portability/fn_api_runner/fn_runner.py @@ -57,6 +57,7 @@ from apache_beam.portability.api import beam_fn_api_pb2 from apache_beam.portability.api import beam_provision_api_pb2 from apache_beam.portability.api import beam_runner_api_pb2 from apache_beam.runners import runner +from apache_beam.runners.common import group_by_key_input_visitor from apache_beam.runners.portability import portable_metrics from apache_beam.runners.portability.fn_api_runner import execution from apache_beam.runners.portability.fn_api_runner import translations @@ -153,10 +154,8 @@ class FnApiRunner(runner.PipelineRunner): # This is sometimes needed if type checking is disabled # to enforce that the inputs (and outputs) of GroupByKey operations # are known to be KVs. - from apache_beam.runners.dataflow.dataflow_runner import DataflowRunner - # TODO: Move group_by_key_input_visitor() to a non-dataflow specific file. pipeline.visit( - DataflowRunner.group_by_key_input_visitor( + group_by_key_input_visitor( not options.view_as(pipeline_options.TypeOptions). allow_non_deterministic_key_coders)) self._bundle_repeat = self._bundle_repeat or options.view_as( @@ -179,6 +178,15 @@ class FnApiRunner(runner.PipelineRunner): self._default_environment = environments.SubprocessSDKEnvironment( command_string=command_string) + if running_mode == 'in_memory' and self._num_workers != 1: + _LOGGER.warning( + 'If direct_num_workers is not equal to 1, direct_running_mode ' + 'should be `multi_processing` or `multi_threading` instead of ' + '`in_memory` in order for it to have the desired worker parallelism ' + 'effect. direct_num_workers: %d ; running_mode: %s', + self._num_workers, + running_mode) + self._profiler_factory = Profile.factory_from_options( options.view_as(pipeline_options.ProfilingOptions))