From commits-return-113216-archive-asf-public=cust-asf.ponee.io@beam.apache.org Sat Mar 6 09:47:50 2021 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mxout1-he-de.apache.org (mxout1-he-de.apache.org [95.216.194.37]) by mx-eu-01.ponee.io (Postfix) with ESMTPS id 08F3118065C for ; Sat, 6 Mar 2021 10:47:50 +0100 (CET) Received: from mail.apache.org (mailroute1-lw-us.apache.org [207.244.88.153]) by mxout1-he-de.apache.org (ASF Mail Server at mxout1-he-de.apache.org) with SMTP id 6670662C3F for ; Sat, 6 Mar 2021 09:47:49 +0000 (UTC) Received: (qmail 28849 invoked by uid 500); 6 Mar 2021 09:47:48 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 28831 invoked by uid 99); 6 Mar 2021 09:47:48 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 06 Mar 2021 09:47:48 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 4525681760; Sat, 6 Mar 2021 09:47:48 +0000 (UTC) Date: Sat, 06 Mar 2021 09:47:44 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: [BEAM-11591] Create pypi dependencies registry and populate environment proto MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <161502406037.8398.13717452293364464887@gitbox.apache.org> From: heejong@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 33d59b34bcf789f196e23d259ca52d2fa057d3a8 X-Git-Newrev: c5127a89dcef2d8202eaded7732162326a39260b X-Git-Rev: c5127a89dcef2d8202eaded7732162326a39260b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. heejong pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new c5127a8 [BEAM-11591] Create pypi dependencies registry and populate environment proto new a0dbc6f Merge pull request #13728 from ihji/BEAM-11591 c5127a8 is described below commit c5127a89dcef2d8202eaded7732162326a39260b Author: Heejong Lee AuthorDate: Fri Jan 8 20:14:31 2021 -0800 [BEAM-11591] Create pypi dependencies registry and populate environment proto --- .../runners/portability/expansion_service.py | 5 +-- sdks/python/apache_beam/transforms/environments.py | 41 ++++++++++------------ 2 files changed, 21 insertions(+), 25 deletions(-) diff --git a/sdks/python/apache_beam/runners/portability/expansion_service.py b/sdks/python/apache_beam/runners/portability/expansion_service.py index 2536ef3..0071ec5 100644 --- a/sdks/python/apache_beam/runners/portability/expansion_service.py +++ b/sdks/python/apache_beam/runners/portability/expansion_service.py @@ -39,6 +39,8 @@ class ExpansionServiceServicer( def __init__(self, options=None): self._options = options or beam_pipeline.PipelineOptions( environment_type=python_urns.EMBEDDED_PYTHON, sdk_location='container') + self._default_environment = ( + portable_runner.PortableRunner._create_environment(self._options)) def Expand(self, request, context=None): try: @@ -54,8 +56,7 @@ class ExpansionServiceServicer( context = pipeline_context.PipelineContext( request.components, - default_environment=portable_runner.PortableRunner. - _create_environment(self._options), + default_environment=self._default_environment, namespace=request.namespace) producers = { pcoll_id: (context.transforms.get_by_id(t_id), pcoll_tag) diff --git a/sdks/python/apache_beam/transforms/environments.py b/sdks/python/apache_beam/transforms/environments.py index d8c5ffb..8b21f6c 100644 --- a/sdks/python/apache_beam/transforms/environments.py +++ b/sdks/python/apache_beam/transforms/environments.py @@ -36,6 +36,7 @@ from typing import Iterator from typing import List from typing import Mapping from typing import Optional +from typing import Set from typing import Tuple from typing import Type from typing import TypeVar @@ -66,7 +67,7 @@ __all__ = [ 'EmbeddedPythonEnvironment', 'EmbeddedPythonGrpcEnvironment', 'SubprocessSDKEnvironment', - 'RunnerAPIEnvironmentHolder' + 'PyPIArtifactRegistry' ] T = TypeVar('T') @@ -680,29 +681,17 @@ class SubprocessSDKEnvironment(Environment): artifacts=python_sdk_dependencies(options)) -class RunnerAPIEnvironmentHolder(Environment): - def __init__(self, proto): - # type: (beam_runner_api_pb2.Environment) -> None - self.proto = proto +class PyPIArtifactRegistry(object): + _registered_artifacts = set() # type: Set[Tuple[str, str]] - def to_runner_api(self, context): - # type: (PipelineContext) -> beam_runner_api_pb2.Environment - return self.proto - - def capabilities(self): - # type: () -> Iterable[str] - return self.proto.capabilities - - def __eq__(self, other): - return self.__class__ == other.__class__ and self.proto == other.proto - - def __ne__(self, other): - # TODO(BEAM-5949): Needed for Python 2 compatibility. - return not self == other + @classmethod + def register_artifact(cls, name, version): + cls._registered_artifacts.add((name, version)) - def __hash__(self): - # type: () -> int - return hash((self.__class__, self.proto)) + @classmethod + def get_artifacts(cls): + for artifact in cls._registered_artifacts: + yield artifact def python_sdk_capabilities(): @@ -730,4 +719,10 @@ def python_sdk_dependencies(options, tmp_dir=None): skip_prestaged_dependencies = options.view_as( SetupOptions).prebuild_sdk_container_engine is not None return stager.Stager.create_job_resources( - options, tmp_dir, skip_prestaged_dependencies=skip_prestaged_dependencies) + options, + tmp_dir, + pypi_requirements=[ + artifact[0] + artifact[1] + for artifact in PyPIArtifactRegistry.get_artifacts() + ], + skip_prestaged_dependencies=skip_prestaged_dependencies)