Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 94094200C38 for ; Wed, 15 Mar 2017 22:49:56 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 928CC160B78; Wed, 15 Mar 2017 21:49:56 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id E6E50160B60 for ; Wed, 15 Mar 2017 22:49:54 +0100 (CET) Received: (qmail 84227 invoked by uid 500); 15 Mar 2017 21:49:53 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 83014 invoked by uid 99); 15 Mar 2017 21:49:52 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 15 Mar 2017 21:49:52 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 65A6EDFFD7; Wed, 15 Mar 2017 21:49:52 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: altay@apache.org To: commits@beam.apache.org Date: Wed, 15 Mar 2017 21:49:52 -0000 Message-Id: <28a0b2d91d594263bf16edb0fa24125a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Revert "Add ValueProvider class for FileBasedSource I/O Transforms" archived-at: Wed, 15 Mar 2017 21:49:56 -0000 Repository: beam Updated Branches: refs/heads/master 88c2fe18d -> 477e789a4 Revert "Add ValueProvider class for FileBasedSource I/O Transforms" This reverts commit 6d1c9043ab0d929e2cea8a3a68192e47a5cc55cd. Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02454134 Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02454134 Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02454134 Branch: refs/heads/master Commit: 02454134fbbac2ce28b5b9507a2077ac09348f88 Parents: 88c2fe1 Author: Sourabh Bajaj Authored: Wed Mar 15 14:33:19 2017 -0700 Committer: Ahmet Altay Committed: Wed Mar 15 14:49:12 2017 -0700 ---------------------------------------------------------------------- sdks/python/apache_beam/examples/wordcount.py | 31 ++-- .../apache_beam/internal/gcp/json_value.py | 6 - sdks/python/apache_beam/io/filebasedsource.py | 46 ++---- .../apache_beam/io/filebasedsource_test.py | 28 +--- sdks/python/apache_beam/io/fileio.py | 44 ++---- sdks/python/apache_beam/io/fileio_test.py | 45 ++---- sdks/python/apache_beam/transforms/display.py | 1 - .../apache_beam/transforms/display_test.py | 36 ----- .../apache_beam/utils/pipeline_options.py | 81 +--------- .../apache_beam/utils/pipeline_options_test.py | 50 +------ sdks/python/apache_beam/utils/value_provider.py | 108 -------------- .../apache_beam/utils/value_provider_test.py | 146 ------------------- 12 files changed, 62 insertions(+), 560 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/examples/wordcount.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py index 9e47e08..50c0328 100644 --- a/sdks/python/apache_beam/examples/wordcount.py +++ b/sdks/python/apache_beam/examples/wordcount.py @@ -19,6 +19,7 @@ from __future__ import absolute_import +import argparse import logging import re @@ -66,28 +67,24 @@ class WordExtractingDoFn(beam.DoFn): def run(argv=None): """Main entry point; defines and runs the wordcount pipeline.""" - class WordcountOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--input', - dest='input', - default='gs://dataflow-samples/shakespeare/kinglear.txt', - help='Input file to process.') - parser.add_value_provider_argument( - '--output', - dest='output', - required=True, - help='Output file to write results to.') - pipeline_options = PipelineOptions(argv) - wordcount_options = pipeline_options.view_as(WordcountOptions) + parser = argparse.ArgumentParser() + parser.add_argument('--input', + dest='input', + default='gs://dataflow-samples/shakespeare/kinglear.txt', + help='Input file to process.') + parser.add_argument('--output', + dest='output', + required=True, + help='Output file to write results to.') + known_args, pipeline_args = parser.parse_known_args(argv) # We use the save_main_session option because one or more DoFn's in this # workflow rely on global context (e.g., a module imported at module level). + pipeline_options = PipelineOptions(pipeline_args) pipeline_options.view_as(SetupOptions).save_main_session = True p = beam.Pipeline(options=pipeline_options) # Read the text file[pattern] into a PCollection. - lines = p | 'read' >> ReadFromText(wordcount_options.input) + lines = p | 'read' >> ReadFromText(known_args.input) # Count the occurrences of each word. counts = (lines @@ -102,7 +99,7 @@ def run(argv=None): # Write the output using a "Write" transform that has side effects. # pylint: disable=expression-not-assigned - output | 'write' >> WriteToText(wordcount_options.output) + output | 'write' >> WriteToText(known_args.output) # Actually run the pipeline (all operations above are deferred). result = p.run() http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/internal/gcp/json_value.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py index 4099c1a..c8b5393 100644 --- a/sdks/python/apache_beam/internal/gcp/json_value.py +++ b/sdks/python/apache_beam/internal/gcp/json_value.py @@ -25,8 +25,6 @@ except ImportError: extra_types = None # pylint: enable=wrong-import-order, wrong-import-position -from apache_beam.utils.value_provider import ValueProvider - _MAXINT64 = (1 << 63) - 1 _MININT64 = - (1 << 63) @@ -106,10 +104,6 @@ def to_json_value(obj, with_type=False): raise TypeError('Can not encode {} as a 64-bit integer'.format(obj)) elif isinstance(obj, float): return extra_types.JsonValue(double_value=obj) - elif isinstance(obj, ValueProvider): - if obj.is_accessible(): - return to_json_value(obj.get()) - return extra_types.JsonValue(is_null=True) else: raise TypeError('Cannot convert %s to a JSON value.' % repr(obj)) http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 8c87e99..582d673 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -34,10 +34,6 @@ from apache_beam.io import fileio from apache_beam.io import iobase from apache_beam.io import range_trackers from apache_beam.transforms.display import DisplayDataItem -from apache_beam.utils.value_provider import ValueProvider -from apache_beam.utils.value_provider import StaticValueProvider -from apache_beam.utils.value_provider import check_accessible - MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25 @@ -57,8 +53,7 @@ class FileBasedSource(iobase.BoundedSource): """Initializes ``FileBasedSource``. Args: - file_pattern: the file glob to read a string or a ValueProvider - (placeholder to inject a runtime value). + file_pattern: the file glob to read. min_bundle_size: minimum size of bundles that should be generated when performing initial splitting on this source. compression_type: compression type to use @@ -76,19 +71,15 @@ class FileBasedSource(iobase.BoundedSource): creation time. Raises: TypeError: when compression_type is not valid or if file_pattern is not a - string or a ValueProvider. + string. ValueError: when compression and splittable files are specified. IOError: when the file pattern specified yields an empty result. """ + if not isinstance(file_pattern, basestring): + raise TypeError( + '%s: file_pattern must be a string; got %r instead' % + (self.__class__.__name__, file_pattern)) - if (not (isinstance(file_pattern, basestring) - or isinstance(file_pattern, ValueProvider))): - raise TypeError('%s: file_pattern must be of type string' - ' or ValueProvider; got %r instead' - % (self.__class__.__name__, file_pattern)) - - if isinstance(file_pattern, basestring): - file_pattern = StaticValueProvider(str, file_pattern) self._pattern = file_pattern self._concat_source = None self._min_bundle_size = min_bundle_size @@ -102,24 +93,21 @@ class FileBasedSource(iobase.BoundedSource): else: # We can't split compressed files efficiently so turn off splitting. self._splittable = False - if validate and file_pattern.is_accessible(): + if validate: self._validate() def display_data(self): - return {'file_pattern': DisplayDataItem(str(self._pattern), + return {'file_pattern': DisplayDataItem(self._pattern, label="File Pattern"), 'compression': DisplayDataItem(str(self._compression_type), label='Compression Type')} - @check_accessible(['_pattern']) def _get_concat_source(self): if self._concat_source is None: - pattern = self._pattern.get() - single_file_sources = [] - file_names = [f for f in fileio.ChannelFactory.glob(pattern)] + file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] sizes = FileBasedSource._estimate_sizes_of_files(file_names, - pattern) + self._pattern) # We create a reference for FileBasedSource that will be serialized along # with each _SingleFileSource. To prevent this FileBasedSource from having @@ -176,16 +164,13 @@ class FileBasedSource(iobase.BoundedSource): file_names) return [file_sizes[f] for f in file_names] - @check_accessible(['_pattern']) def _validate(self): """Validate if there are actual files in the specified glob pattern """ - pattern = self._pattern.get() - # Limit the responses as we only want to check if something exists - if len(fileio.ChannelFactory.glob(pattern, limit=1)) <= 0: + if len(fileio.ChannelFactory.glob(self._pattern, limit=1)) <= 0: raise IOError( - 'No files found based on the file pattern %s' % pattern) + 'No files found based on the file pattern %s' % self._pattern) def split( self, desired_bundle_size=None, start_position=None, stop_position=None): @@ -194,11 +179,8 @@ class FileBasedSource(iobase.BoundedSource): start_position=start_position, stop_position=stop_position) - @check_accessible(['_pattern']) def estimate_size(self): - pattern = self._pattern.get() - file_names = [f for f in fileio.ChannelFactory.glob(pattern)] - + file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] # We're reading very few files so we can pass names file names to # _estimate_sizes_of_files without pattern as otherwise we'll try to do # optimization based on the pattern and might end up reading much more @@ -239,7 +221,7 @@ class FileBasedSource(iobase.BoundedSource): defined by a given ``RangeTracker``. Returns: - an iterator that gives the records read from the given file. + a iterator that gives the records read from the given file. """ raise NotImplementedError http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/io/filebasedsource_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py index 51097de..7481c4c 100644 --- a/sdks/python/apache_beam/io/filebasedsource_test.py +++ b/sdks/python/apache_beam/io/filebasedsource_test.py @@ -43,8 +43,6 @@ from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.transforms.util import assert_that from apache_beam.transforms.util import equal_to -from apache_beam.utils.value_provider import StaticValueProvider -from apache_beam.utils.value_provider import RuntimeValueProvider class LineSource(FileBasedSource): @@ -223,28 +221,6 @@ class TestFileBasedSource(unittest.TestCase): # environments with limited amount of resources. filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2 - def test_string_or_value_provider_only(self): - str_file_pattern = tempfile.NamedTemporaryFile(delete=False).name - self.assertEqual(str_file_pattern, - FileBasedSource(str_file_pattern)._pattern.value) - - static_vp_file_pattern = StaticValueProvider(value_type=str, - value=str_file_pattern) - self.assertEqual(static_vp_file_pattern, - FileBasedSource(static_vp_file_pattern)._pattern) - - runtime_vp_file_pattern = RuntimeValueProvider( - pipeline_options_subclass=object, - option_name='blah', - value_type=str, - default_value=str_file_pattern) - self.assertEqual(runtime_vp_file_pattern, - FileBasedSource(runtime_vp_file_pattern)._pattern) - - invalid_file_pattern = 123 - with self.assertRaises(TypeError): - FileBasedSource(invalid_file_pattern) - def test_validation_file_exists(self): file_name, _ = write_data(10) LineSource(file_name) @@ -611,9 +587,7 @@ class TestSingleFileSource(unittest.TestCase): dd = DisplayData.create_from(fbs) expected_items = [ DisplayDataItemMatcher('compression', 'auto'), - DisplayDataItemMatcher( - 'file_pattern', - file_name)] + DisplayDataItemMatcher('file_pattern', file_name)] hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 243cda6..49a2082 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # - """File-based sources and sinks.""" from __future__ import absolute_import @@ -32,9 +31,6 @@ import zlib from apache_beam.internal import util from apache_beam.io import iobase from apache_beam.transforms.display import DisplayDataItem -from apache_beam.utils.value_provider import ValueProvider -from apache_beam.utils.value_provider import StaticValueProvider -from apache_beam.utils.value_provider import check_accessible # TODO(sourabhbajaj): Fix the constant values after the new IO factory # Current constants are copy pasted from gcsio.py till we fix this. @@ -548,30 +544,25 @@ class FileSink(iobase.Sink): compression_type=CompressionTypes.AUTO): """ Raises: - TypeError: if file path parameters are not a string or ValueProvider, - or if compression_type is not member of CompressionTypes. + TypeError: if file path parameters are not a string or if compression_type + is not member of CompressionTypes. ValueError: if shard_name_template is not of expected format. """ - if not (isinstance(file_path_prefix, basestring) - or isinstance(file_path_prefix, ValueProvider)): - raise TypeError('file_path_prefix must be a string or ValueProvider;' - 'got %r instead' % file_path_prefix) - if not (isinstance(file_name_suffix, basestring) - or isinstance(file_name_suffix, ValueProvider)): - raise TypeError('file_name_suffix must be a string or ValueProvider;' - 'got %r instead' % file_name_suffix) + if not isinstance(file_path_prefix, basestring): + raise TypeError('file_path_prefix must be a string; got %r instead' % + file_path_prefix) + if not isinstance(file_name_suffix, basestring): + raise TypeError('file_name_suffix must be a string; got %r instead' % + file_name_suffix) if not CompressionTypes.is_valid_compression_type(compression_type): raise TypeError('compression_type must be CompressionType object but ' 'was %s' % type(compression_type)) + if shard_name_template is None: shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE elif shard_name_template is '': num_shards = 1 - if isinstance(file_path_prefix, basestring): - file_path_prefix = StaticValueProvider(str, file_path_prefix) - if isinstance(file_name_suffix, basestring): - file_name_suffix = StaticValueProvider(str, file_name_suffix) self.file_path_prefix = file_path_prefix self.file_name_suffix = file_name_suffix self.num_shards = num_shards @@ -627,31 +618,22 @@ class FileSink(iobase.Sink): if file_handle is not None: file_handle.close() - @check_accessible(['file_path_prefix', 'file_name_suffix']) def initialize_write(self): - file_path_prefix = self.file_path_prefix.get() - file_name_suffix = self.file_name_suffix.get() - tmp_dir = file_path_prefix + file_name_suffix + time.strftime( + tmp_dir = self.file_path_prefix + self.file_name_suffix + time.strftime( '-temp-%Y-%m-%d_%H-%M-%S') ChannelFactory().mkdir(tmp_dir) return tmp_dir - @check_accessible(['file_path_prefix', 'file_name_suffix']) def open_writer(self, init_result, uid): # A proper suffix is needed for AUTO compression detection. # We also ensure there will be no collisions with uid and a # (possibly unsharded) file_path_prefix and a (possibly empty) # file_name_suffix. - file_path_prefix = self.file_path_prefix.get() - file_name_suffix = self.file_name_suffix.get() suffix = ( - '.' + os.path.basename(file_path_prefix) + file_name_suffix) + '.' + os.path.basename(self.file_path_prefix) + self.file_name_suffix) return FileSinkWriter(self, os.path.join(init_result, uid) + suffix) - @check_accessible(['file_path_prefix', 'file_name_suffix']) def finalize_write(self, init_result, writer_results): - file_path_prefix = self.file_path_prefix.get() - file_name_suffix = self.file_name_suffix.get() writer_results = sorted(writer_results) num_shards = len(writer_results) min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS) @@ -660,8 +642,8 @@ class FileSink(iobase.Sink): rename_ops = [] for shard_num, shard in enumerate(writer_results): final_name = ''.join([ - file_path_prefix, self.shard_name_format % dict( - shard_num=shard_num, num_shards=num_shards), file_name_suffix + self.file_path_prefix, self.shard_name_format % dict( + shard_num=shard_num, num_shards=num_shards), self.file_name_suffix ]) rename_ops.append((shard, final_name)) http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 4ca0521..a963c67 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -35,8 +35,6 @@ from apache_beam.test_pipeline import TestPipeline from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher -from apache_beam.utils.value_provider import StaticValueProvider - class TestChannelFactory(unittest.TestCase): @@ -170,7 +168,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): def test_file_sink_writing(self): temp_path = os.path.join(self._new_tempdir(), 'filesink') sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) + temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder()) # Manually invoke the generic Sink API. init_token = sink.initialize_write() @@ -191,8 +189,8 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): res = list(sink.finalize_write(init_token, [res1, res2])) # Check the results. - shard1 = temp_path + '-00000-of-00002.output' - shard2 = temp_path + '-00001-of-00002.output' + shard1 = temp_path + '-00000-of-00002.foo' + shard2 = temp_path + '-00001-of-00002.foo' self.assertEqual(res, [shard1, shard2]) self.assertEqual(open(shard1).read(), '[start][a][b][end]') self.assertEqual(open(shard2).read(), '[start][x][y][z][end]') @@ -203,48 +201,33 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): def test_file_sink_display_data(self): temp_path = os.path.join(self._new_tempdir(), 'display') sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) + temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder()) dd = DisplayData.create_from(sink) expected_items = [ DisplayDataItemMatcher( 'compression', 'auto'), DisplayDataItemMatcher( 'file_pattern', - '{}{}'.format( - temp_path, - '-%(shard_num)05d-of-%(num_shards)05d.output'))] + '{}{}'.format(temp_path, + '-%(shard_num)05d-of-%(num_shards)05d.foo'))] + hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items)) def test_empty_write(self): temp_path = tempfile.NamedTemporaryFile().name sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder() - ) - p = TestPipeline() - p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned - p.run() - self.assertEqual( - open(temp_path + '-00000-of-00001.output').read(), '[start][end]') - - def test_static_value_provider_empty_write(self): - temp_path = StaticValueProvider(value_type=str, - value=tempfile.NamedTemporaryFile().name) - sink = MyFileSink( - temp_path, - file_name_suffix=StaticValueProvider(value_type=str, value='.output'), - coder=coders.ToStringCoder() - ) + temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder()) p = TestPipeline() p | beam.Create([]) | beam.io.Write(sink) # pylint: disable=expression-not-assigned p.run() self.assertEqual( - open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]') + open(temp_path + '-00000-of-00001.foo').read(), '[start][end]') def test_fixed_shard_write(self): temp_path = os.path.join(self._new_tempdir(), 'empty') sink = MyFileSink( temp_path, - file_name_suffix='.output', + file_name_suffix='.foo', num_shards=3, shard_name_template='_NN_SSS_', coder=coders.ToStringCoder()) @@ -254,7 +237,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): p.run() concat = ''.join( - open(temp_path + '_03_%03d_.output' % shard_num).read() + open(temp_path + '_03_%03d_.foo' % shard_num).read() for shard_num in range(3)) self.assertTrue('][a][' in concat, concat) self.assertTrue('][b][' in concat, concat) @@ -262,7 +245,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): def test_file_sink_multi_shards(self): temp_path = os.path.join(self._new_tempdir(), 'multishard') sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) + temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder()) # Manually invoke the generic Sink API. init_token = sink.initialize_write() @@ -285,7 +268,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): res = sorted(res_second) for i in range(num_shards): - shard_name = '%s-%05d-of-%05d.output' % (temp_path, i, num_shards) + shard_name = '%s-%05d-of-%05d.foo' % (temp_path, i, num_shards) uuid = 'uuid-%05d' % i self.assertEqual(res[i], shard_name) self.assertEqual( @@ -297,7 +280,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp): def test_file_sink_io_error(self): temp_path = os.path.join(self._new_tempdir(), 'ioerror') sink = MyFileSink( - temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()) + temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder()) # Manually invoke the generic Sink API. init_token = sink.initialize_write() http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/transforms/display.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py index f2ce0fc..2ced1af 100644 --- a/sdks/python/apache_beam/transforms/display.py +++ b/sdks/python/apache_beam/transforms/display.py @@ -40,7 +40,6 @@ from datetime import datetime, timedelta import inspect import json - __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData'] http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/transforms/display_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py index 7d1130b..5e106e5 100644 --- a/sdks/python/apache_beam/transforms/display_test.py +++ b/sdks/python/apache_beam/transforms/display_test.py @@ -114,42 +114,6 @@ class DisplayDataTest(unittest.TestCase): with self.assertRaises(ValueError): DisplayData.create_from_options(MyDisplayComponent()) - def test_value_provider_display_data(self): - class TestOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--int_flag', - type=int, - help='int_flag description') - parser.add_value_provider_argument( - '--str_flag', - type=str, - default='hello', - help='str_flag description') - parser.add_value_provider_argument( - '--float_flag', - type=float, - help='float_flag description') - options = TestOptions(['--int_flag', '1']) - items = DisplayData.create_from_options(options).items - expected_items = [ - DisplayDataItemMatcher( - 'int_flag', - '1'), - DisplayDataItemMatcher( - 'str_flag', - 'RuntimeValueProvider(option: str_flag,' - ' type: str, default_value: \'hello\')' - ), - DisplayDataItemMatcher( - 'float_flag', - 'RuntimeValueProvider(option: float_flag,' - ' type: float, default_value: None)' - ) - ] - hc.assert_that(items, hc.contains_inanyorder(*expected_items)) - def test_create_list_display_data(self): flags = ['--extra_package', 'package1', '--extra_package', 'package2'] pipeline_options = PipelineOptions(flags=flags) http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/utils/pipeline_options.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py index 3b5f973..d07c328 100644 --- a/sdks/python/apache_beam/utils/pipeline_options.py +++ b/sdks/python/apache_beam/utils/pipeline_options.py @@ -20,75 +20,6 @@ import argparse from apache_beam.transforms.display import HasDisplayData -from apache_beam.utils.value_provider import StaticValueProvider -from apache_beam.utils.value_provider import RuntimeValueProvider -from apache_beam.utils.value_provider import ValueProvider - - -def _static_value_provider_of(value_type): - """"Helper function to plug a ValueProvider into argparse. - - Args: - value_type: the type of the value. Since the type param of argparse's - add_argument will always be ValueProvider, we need to - preserve the type of the actual value. - Returns: - A partially constructed StaticValueProvider in the form of a function. - - """ - def _f(value): - _f.func_name = value_type.__name__ - return StaticValueProvider(value_type, value) - return _f - - -class BeamArgumentParser(argparse.ArgumentParser): - """An ArgumentParser that supports ValueProvider options. - - Example Usage:: - - class TemplateUserOptions(PipelineOptions): - @classmethod - - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument('--vp-arg1', default='start') - parser.add_value_provider_argument('--vp-arg2') - parser.add_argument('--non-vp-arg') - - """ - def add_value_provider_argument(self, *args, **kwargs): - """ValueProvider arguments can be either of type keyword or positional. - At runtime, even positional arguments will need to be supplied in the - key/value form. - """ - # Extract the option name from positional argument ['pos_arg'] - assert args != () and len(args[0]) >= 1 - if args[0][0] != '-': - option_name = args[0] - if kwargs.get('nargs') is None: # make them optionally templated - kwargs['nargs'] = '?' - else: - # or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg] - option_name = [i.replace('--', '') for i in args if i[:2] == '--'][0] - - # reassign the type to make room for using - # StaticValueProvider as the type for add_argument - value_type = kwargs.get('type') or str - kwargs['type'] = _static_value_provider_of(value_type) - - # reassign default to default_value to make room for using - # RuntimeValueProvider as the default for add_argument - default_value = kwargs.get('default') - kwargs['default'] = RuntimeValueProvider( - pipeline_options_subclass=(self.pipeline_options_subclass - or PipelineOptions), - option_name=option_name, - value_type=value_type, - default_value=default_value, - ) - - # have add_argument do most of the work - self.add_argument(*args, **kwargs) class PipelineOptions(HasDisplayData): @@ -136,13 +67,11 @@ class PipelineOptions(HasDisplayData): """ self._flags = flags self._all_options = kwargs - parser = BeamArgumentParser() - + parser = argparse.ArgumentParser() for cls in type(self).mro(): if cls == PipelineOptions: break elif '_add_argparse_args' in cls.__dict__: - parser.pipeline_options_subclass = cls cls._add_argparse_args(parser) # The _visible_options attribute will contain only those options from the # flags (i.e., command line) that can be recognized. The _all_options @@ -190,13 +119,13 @@ class PipelineOptions(HasDisplayData): # TODO(BEAM-1319): PipelineOption sub-classes in the main session might be # repeated. Pick last unique instance of each subclass to avoid conflicts. + parser = argparse.ArgumentParser() subset = {} - parser = BeamArgumentParser() for cls in PipelineOptions.__subclasses__(): subset[str(cls)] = cls for cls in subset.values(): - parser.pipeline_options_subclass = cls cls._add_argparse_args(parser) # pylint: disable=protected-access + known_args, _ = parser.parse_known_args(self._flags) result = vars(known_args) @@ -204,9 +133,7 @@ class PipelineOptions(HasDisplayData): for k in result.keys(): if k in self._all_options: result[k] = self._all_options[k] - if (drop_default and - parser.get_default(k) == result[k] and - not isinstance(parser.get_default(k), ValueProvider)): + if drop_default and parser.get_default(k) == result[k]: del result[k] return result http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/utils/pipeline_options_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py index f1df6a1..507a827 100644 --- a/sdks/python/apache_beam/utils/pipeline_options_test.py +++ b/sdks/python/apache_beam/utils/pipeline_options_test.py @@ -24,8 +24,6 @@ import hamcrest as hc from apache_beam.transforms.display import DisplayData from apache_beam.transforms.display_test import DisplayDataItemMatcher from apache_beam.utils.pipeline_options import PipelineOptions -from apache_beam.utils.value_provider import StaticValueProvider -from apache_beam.utils.value_provider import RuntimeValueProvider class PipelineOptionsTest(unittest.TestCase): @@ -133,7 +131,7 @@ class PipelineOptionsTest(unittest.TestCase): options.view_as(PipelineOptionsTest.MockOptions).mock_flag = True self.assertEqual(options.get_all_options()['num_workers'], 5) - self.assertTrue(options.get_all_options()['mock_flag']) + self.assertEqual(options.get_all_options()['mock_flag'], True) def test_experiments(self): options = PipelineOptions(['--experiment', 'abc', '--experiment', 'def']) @@ -187,51 +185,7 @@ class PipelineOptionsTest(unittest.TestCase): parser.add_argument('--redefined_flag', action='store_true') options = PipelineOptions(['--redefined_flag']) - self.assertTrue(options.get_all_options()['redefined_flag']) - - def test_value_provider_options(self): - class UserOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--vp_arg', - help='This flag is a value provider') - - parser.add_value_provider_argument( - '--vp_arg2', - default=1, - type=int) - - parser.add_argument( - '--non_vp_arg', - default=1, - type=int - ) - - # Provide values: if not provided, the option becomes of the type runtime vp - options = UserOptions(['--vp_arg', 'hello']) - self.assertIsInstance(options.vp_arg, StaticValueProvider) - self.assertIsInstance(options.vp_arg2, RuntimeValueProvider) - self.assertIsInstance(options.non_vp_arg, int) - - # Values can be overwritten - options = UserOptions(vp_arg=5, - vp_arg2=StaticValueProvider(value_type=str, - value='bye'), - non_vp_arg=RuntimeValueProvider( - pipeline_options_subclass=UserOptions, - option_name='foo', - value_type=int, - default_value=10)) - self.assertEqual(options.vp_arg, 5) - self.assertTrue(options.vp_arg2.is_accessible(), - '%s is not accessible' % options.vp_arg2) - self.assertEqual(options.vp_arg2.get(), 'bye') - self.assertFalse(options.non_vp_arg.is_accessible()) - - with self.assertRaises(RuntimeError): - options.non_vp_arg.get() - + self.assertEqual(options.get_all_options()['redefined_flag'], True) if __name__ == '__main__': logging.getLogger().setLevel(logging.INFO) http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/utils/value_provider.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py deleted file mode 100644 index 7ae8998..0000000 --- a/sdks/python/apache_beam/utils/value_provider.py +++ /dev/null @@ -1,108 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""A ValueProvider class to implement templates with both statically -and dynamically provided values. -""" - -from functools import wraps - - -class ValueProvider(object): - def is_accessible(self): - raise NotImplementedError( - 'ValueProvider.is_accessible implemented in derived classes' - ) - - def get(self): - raise NotImplementedError( - 'ValueProvider.get implemented in derived classes' - ) - - -class StaticValueProvider(ValueProvider): - def __init__(self, value_type, value): - self.value_type = value_type - self.value = value_type(value) - - def is_accessible(self): - return True - - def get(self): - return self.value - - def __str__(self): - return str(self.value) - - -class RuntimeValueProvider(ValueProvider): - pipeline_options_dict = None - - def __init__(self, pipeline_options_subclass, option_name, - value_type, default_value): - self.pipeline_options_subclass = pipeline_options_subclass - self.option_name = option_name - self.default_value = default_value - self.value_type = value_type - - def is_accessible(self): - return RuntimeValueProvider.pipeline_options_dict is not None - - def get(self): - pipeline_options_dict = RuntimeValueProvider.pipeline_options_dict - if pipeline_options_dict is None: - raise RuntimeError('%s.get() not called from a runtime context' %self) - pipeline_option = ( - self.pipeline_options_subclass.from_dictionary(pipeline_options_dict) - ._visible_options - .__dict__ - .get(self.option_name) - ) - value = ( - pipeline_option.get() - if isinstance(pipeline_option, StaticValueProvider) - else self.default_value - ) - return value - - @classmethod - def set_runtime_options(cls, pipeline_options): - assert RuntimeValueProvider.pipeline_options_dict is None - RuntimeValueProvider.pipeline_options_dict = pipeline_options - - def __str__(self): - return '%s(option: %s, type: %s, default_value: %s)' % ( - self.__class__.__name__, - self.option_name, - self.value_type.__name__, - repr(self.default_value) - ) - - -def check_accessible(value_provider_list): - """Check accessibility of a list of ValueProvider objects.""" - assert isinstance(value_provider_list, list) - - def _check_accessible(fnc): - @wraps(fnc) - def _f(self, *args, **kwargs): - for obj in [getattr(self, vp) for vp in value_provider_list]: - if not obj.is_accessible(): - raise RuntimeError('%s not accessible' % obj) - return fnc(self, *args, **kwargs) - return _f - return _check_accessible http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/utils/value_provider_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/utils/value_provider_test.py b/sdks/python/apache_beam/utils/value_provider_test.py deleted file mode 100644 index 0bc3b1d..0000000 --- a/sdks/python/apache_beam/utils/value_provider_test.py +++ /dev/null @@ -1,146 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# - -"""Unit tests for the ValueProvider class.""" - -import unittest - -from apache_beam.utils.pipeline_options import PipelineOptions -from apache_beam.utils.value_provider import RuntimeValueProvider -from apache_beam.utils.value_provider import StaticValueProvider - - -class ValueProviderTests(unittest.TestCase): - def test_static_value_provider_keyword_argument(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--vp_arg', - help='This keyword argument is a value provider', - default='some value') - options = UserDefinedOptions(['--vp_arg', 'abc']) - self.assertTrue(isinstance(options.vp_arg, StaticValueProvider)) - self.assertTrue(options.vp_arg.is_accessible()) - self.assertEqual(options.vp_arg.get(), 'abc') - - def test_runtime_value_provider_keyword_arguent(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--vp_arg', - help='This keyword argument is a value provider') - options = UserDefinedOptions() - self.assertTrue(isinstance(options.vp_arg, RuntimeValueProvider)) - self.assertFalse(options.vp_arg.is_accessible()) - with self.assertRaises(RuntimeError): - options.vp_arg.get() - - def test_static_value_provider_positional_argument(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - 'vp_pos_arg', - help='This positional argument is a value provider', - default='some value') - options = UserDefinedOptions(['abc']) - self.assertTrue(isinstance(options.vp_pos_arg, StaticValueProvider)) - self.assertTrue(options.vp_pos_arg.is_accessible()) - self.assertEqual(options.vp_pos_arg.get(), 'abc') - - def test_runtime_value_provider_positional_argument(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - 'vp_pos_arg', - help='This positional argument is a value provider') - options = UserDefinedOptions([]) - self.assertTrue(isinstance(options.vp_pos_arg, RuntimeValueProvider)) - self.assertFalse(options.vp_pos_arg.is_accessible()) - with self.assertRaises(RuntimeError): - options.vp_pos_arg.get() - - def test_static_value_provider_type_cast(self): - class UserDefinedOptions(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--vp_arg', - type=int, - help='This flag is a value provider') - - options = UserDefinedOptions(['--vp_arg', '123']) - self.assertTrue(isinstance(options.vp_arg, StaticValueProvider)) - self.assertTrue(options.vp_arg.is_accessible()) - self.assertEqual(options.vp_arg.get(), 123) - - def test_set_runtime_option(self): - # define ValueProvider ptions, with and without default values - class UserDefinedOptions1(PipelineOptions): - @classmethod - def _add_argparse_args(cls, parser): - parser.add_value_provider_argument( - '--vp_arg', - help='This keyword argument is a value provider') # set at runtime - - parser.add_value_provider_argument( # not set, had default int - '-v', '--vp_arg2', # with short form - default=123, - type=int) - - parser.add_value_provider_argument( # not set, had default str - '--vp-arg3', # with dash in name - default='123', - type=str) - - parser.add_value_provider_argument( # not set and no default - '--vp_arg4', - type=float) - - parser.add_value_provider_argument( # positional argument set - 'vp_pos_arg', # default & runtime ignored - help='This positional argument is a value provider', - type=float, - default=5.4) - - # provide values at graph-construction time - # (options not provided here become of the type RuntimeValueProvider) - options = UserDefinedOptions1(['1.2']) - self.assertFalse(options.vp_arg.is_accessible()) - self.assertFalse(options.vp_arg2.is_accessible()) - self.assertFalse(options.vp_arg3.is_accessible()) - self.assertFalse(options.vp_arg4.is_accessible()) - self.assertTrue(options.vp_pos_arg.is_accessible()) - - # provide values at job-execution time - # (options not provided here will use their default, if they have one) - RuntimeValueProvider.set_runtime_options({'vp_arg': 'abc', - 'vp_pos_arg':'3.2'}) - # RuntimeValueProvider.set_runtime_options({'vp_arg': 'abc'}) - self.assertTrue(options.vp_arg.is_accessible()) - self.assertEqual(options.vp_arg.get(), 'abc') - self.assertTrue(options.vp_arg2.is_accessible()) - self.assertEqual(options.vp_arg2.get(), 123) - self.assertTrue(options.vp_arg3.is_accessible()) - self.assertEqual(options.vp_arg3.get(), '123') - self.assertTrue(options.vp_arg4.is_accessible()) - self.assertIsNone(options.vp_arg4.get()) - self.assertTrue(options.vp_pos_arg.is_accessible()) - self.assertEqual(options.vp_pos_arg.get(), 1.2)