Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id CC41F200CCB for ; Thu, 15 Jun 2017 03:19:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id CAFF3160BDB; Thu, 15 Jun 2017 01:19:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E935F160BE8 for ; Thu, 15 Jun 2017 03:19:29 +0200 (CEST) Received: (qmail 19816 invoked by uid 500); 15 Jun 2017 01:19:29 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 19806 invoked by uid 99); 15 Jun 2017 01:19:28 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 15 Jun 2017 01:19:28 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 15C05DFF15; Thu, 15 Jun 2017 01:19:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: robertwb@apache.org To: commits@beam.apache.org Date: Thu, 15 Jun 2017 01:19:29 -0000 Message-Id: <8ec3925d8b194c28bd4a86105de3b51e@git.apache.org> In-Reply-To: <24cf33fcb7d44ff3886c1f8ecf22bc58@git.apache.org> References: <24cf33fcb7d44ff3886c1f8ecf22bc58@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [2/2] beam git commit: [BEAM-1585] Add beam plugins as pipeline options archived-at: Thu, 15 Jun 2017 01:19:31 -0000 [BEAM-1585] Add beam plugins as pipeline options Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/329bf1e7 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/329bf1e7 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/329bf1e7 Branch: refs/heads/master Commit: 329bf1e775c29b84a498fc106342fddd6e11f0b6 Parents: c962083 Author: Sourabh Bajaj Authored: Wed Jun 14 16:35:45 2017 -0700 Committer: Robert Bradshaw Committed: Wed Jun 14 18:18:52 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filesystem.py | 14 ++----- .../apache_beam/options/pipeline_options.py | 8 ++++ .../runners/dataflow/dataflow_runner.py | 10 +++++ sdks/python/apache_beam/utils/plugin.py | 42 ++++++++++++++++++++ 4 files changed, 63 insertions(+), 11 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/io/filesystem.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filesystem.py b/sdks/python/apache_beam/io/filesystem.py index db6a1d0..f553026 100644 --- a/sdks/python/apache_beam/io/filesystem.py +++ b/sdks/python/apache_beam/io/filesystem.py @@ -26,6 +26,8 @@ import zlib import logging import time +from apache_beam.utils.plugin import BeamPlugin + logger = logging.getLogger(__name__) DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 @@ -409,7 +411,7 @@ class BeamIOError(IOError): self.exception_details = exception_details -class FileSystem(object): +class FileSystem(BeamPlugin): """A class that defines the functions that can be performed on a filesystem. All methods are abstract and they are for file system providers to @@ -429,16 +431,6 @@ class FileSystem(object): return compression_type @classmethod - def get_all_subclasses(cls): - """Get all the subclasses of the FileSystem class - """ - all_subclasses = [] - for subclass in cls.__subclasses__(): - all_subclasses.append(subclass) - all_subclasses.extend(subclass.get_all_subclasses()) - return all_subclasses - - @classmethod def scheme(cls): """URI scheme for the FileSystem """ http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/options/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/options/pipeline_options.py b/sdks/python/apache_beam/options/pipeline_options.py index 8598e05..283b340 100644 --- a/sdks/python/apache_beam/options/pipeline_options.py +++ b/sdks/python/apache_beam/options/pipeline_options.py @@ -545,6 +545,14 @@ class SetupOptions(PipelineOptions): 'worker will install the resulting package before running any custom ' 'code.')) parser.add_argument( + '--beam_plugins', + default=None, + help= + ('Bootstrap the python process before executing any code by importing ' + 'all the plugins used in the pipeline. Please pass a comma separated' + 'list of import paths to be included. This is currently an ' + 'experimental flag and provides no stability.')) + parser.add_argument( '--save_main_session', default=False, action='store_true', http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py index d6944b2..cc9274e 100644 --- a/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py +++ b/sdks/python/apache_beam/runners/dataflow/dataflow_runner.py @@ -46,6 +46,8 @@ from apache_beam.runners.runner import PipelineState from apache_beam.transforms.display import DisplayData from apache_beam.typehints import typehints from apache_beam.options.pipeline_options import StandardOptions +from apache_beam.options.pipeline_options import SetupOptions +from apache_beam.utils.plugin import BeamPlugin __all__ = ['DataflowRunner'] @@ -226,6 +228,14 @@ class DataflowRunner(PipelineRunner): raise ImportError( 'Google Cloud Dataflow runner not available, ' 'please install apache_beam[gcp]') + + # Add setup_options for all the BeamPlugin imports + setup_options = pipeline._options.view_as(SetupOptions) + plugins = BeamPlugin.get_all_plugin_paths() + if setup_options.beam_plugins is not None: + plugins = list(set(plugins + setup_options.beam_plugins.split(','))) + setup_options.beam_plugins = plugins + self.job = apiclient.Job(pipeline._options) # Dataflow runner requires a KV type for GBK inputs, hence we enforce that http://git-wip-us.apache.org/repos/asf/beam/blob/329bf1e7/sdks/python/apache_beam/utils/plugin.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/plugin.py b/sdks/python/apache_beam/utils/plugin.py new file mode 100644 index 0000000..563b93c --- /dev/null +++ b/sdks/python/apache_beam/utils/plugin.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""A BeamPlugin base class. + +For experimental usage only; no backwards-compatibility guarantees. +""" + + +class BeamPlugin(object): + """Plugin base class to be extended by dependent users such as FileSystem. + Any instantiated subclass will be imported at worker startup time.""" + + @classmethod + def get_all_subclasses(cls): + """Get all the subclasses of the BeamPlugin class.""" + all_subclasses = [] + for subclass in cls.__subclasses__(): + all_subclasses.append(subclass) + all_subclasses.extend(subclass.get_all_subclasses()) + return all_subclasses + + @classmethod + def get_all_plugin_paths(cls): + """Get full import paths of the BeamPlugin subclass.""" + def fullname(o): + return o.__module__ + "." + o.__name__ + return [fullname(o) for o in cls.get_all_subclasses()]