beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Revert "Add ValueProvider class for FileBasedSource I/O Transforms"
Date Wed, 15 Mar 2017 21:49:52 GMT
Repository: beam
Updated Branches:
  refs/heads/master 88c2fe18d -> 477e789a4


Revert "Add ValueProvider class for FileBasedSource I/O Transforms"

This reverts commit 6d1c9043ab0d929e2cea8a3a68192e47a5cc55cd.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/02454134
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/02454134
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/02454134

Branch: refs/heads/master
Commit: 02454134fbbac2ce28b5b9507a2077ac09348f88
Parents: 88c2fe1
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Wed Mar 15 14:33:19 2017 -0700
Committer: Ahmet Altay <altay@google.com>
Committed: Wed Mar 15 14:49:12 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/examples/wordcount.py   |  31 ++--
 .../apache_beam/internal/gcp/json_value.py      |   6 -
 sdks/python/apache_beam/io/filebasedsource.py   |  46 ++----
 .../apache_beam/io/filebasedsource_test.py      |  28 +---
 sdks/python/apache_beam/io/fileio.py            |  44 ++----
 sdks/python/apache_beam/io/fileio_test.py       |  45 ++----
 sdks/python/apache_beam/transforms/display.py   |   1 -
 .../apache_beam/transforms/display_test.py      |  36 -----
 .../apache_beam/utils/pipeline_options.py       |  81 +---------
 .../apache_beam/utils/pipeline_options_test.py  |  50 +------
 sdks/python/apache_beam/utils/value_provider.py | 108 --------------
 .../apache_beam/utils/value_provider_test.py    | 146 -------------------
 12 files changed, 62 insertions(+), 560 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/examples/wordcount.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/wordcount.py b/sdks/python/apache_beam/examples/wordcount.py
index 9e47e08..50c0328 100644
--- a/sdks/python/apache_beam/examples/wordcount.py
+++ b/sdks/python/apache_beam/examples/wordcount.py
@@ -19,6 +19,7 @@
 
 from __future__ import absolute_import
 
+import argparse
 import logging
 import re
 
@@ -66,28 +67,24 @@ class WordExtractingDoFn(beam.DoFn):
 
 def run(argv=None):
   """Main entry point; defines and runs the wordcount pipeline."""
-  class WordcountOptions(PipelineOptions):
-    @classmethod
-    def _add_argparse_args(cls, parser):
-      parser.add_value_provider_argument(
-          '--input',
-          dest='input',
-          default='gs://dataflow-samples/shakespeare/kinglear.txt',
-          help='Input file to process.')
-      parser.add_value_provider_argument(
-          '--output',
-          dest='output',
-          required=True,
-          help='Output file to write results to.')
-  pipeline_options = PipelineOptions(argv)
-  wordcount_options = pipeline_options.view_as(WordcountOptions)
+  parser = argparse.ArgumentParser()
+  parser.add_argument('--input',
+                      dest='input',
+                      default='gs://dataflow-samples/shakespeare/kinglear.txt',
+                      help='Input file to process.')
+  parser.add_argument('--output',
+                      dest='output',
+                      required=True,
+                      help='Output file to write results to.')
+  known_args, pipeline_args = parser.parse_known_args(argv)
   # We use the save_main_session option because one or more DoFn's in this
   # workflow rely on global context (e.g., a module imported at module level).
+  pipeline_options = PipelineOptions(pipeline_args)
   pipeline_options.view_as(SetupOptions).save_main_session = True
   p = beam.Pipeline(options=pipeline_options)
 
   # Read the text file[pattern] into a PCollection.
-  lines = p | 'read' >> ReadFromText(wordcount_options.input)
+  lines = p | 'read' >> ReadFromText(known_args.input)
 
   # Count the occurrences of each word.
   counts = (lines
@@ -102,7 +99,7 @@ def run(argv=None):
 
   # Write the output using a "Write" transform that has side effects.
   # pylint: disable=expression-not-assigned
-  output | 'write' >> WriteToText(wordcount_options.output)
+  output | 'write' >> WriteToText(known_args.output)
 
   # Actually run the pipeline (all operations above are deferred).
   result = p.run()

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/internal/gcp/json_value.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/gcp/json_value.py b/sdks/python/apache_beam/internal/gcp/json_value.py
index 4099c1a..c8b5393 100644
--- a/sdks/python/apache_beam/internal/gcp/json_value.py
+++ b/sdks/python/apache_beam/internal/gcp/json_value.py
@@ -25,8 +25,6 @@ except ImportError:
   extra_types = None
 # pylint: enable=wrong-import-order, wrong-import-position
 
-from apache_beam.utils.value_provider import ValueProvider
-
 
 _MAXINT64 = (1 << 63) - 1
 _MININT64 = - (1 << 63)
@@ -106,10 +104,6 @@ def to_json_value(obj, with_type=False):
       raise TypeError('Can not encode {} as a 64-bit integer'.format(obj))
   elif isinstance(obj, float):
     return extra_types.JsonValue(double_value=obj)
-  elif isinstance(obj, ValueProvider):
-    if obj.is_accessible():
-      return to_json_value(obj.get())
-    return extra_types.JsonValue(is_null=True)
   else:
     raise TypeError('Cannot convert %s to a JSON value.' % repr(obj))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 8c87e99..582d673 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -34,10 +34,6 @@ from apache_beam.io import fileio
 from apache_beam.io import iobase
 from apache_beam.io import range_trackers
 from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.utils.value_provider import ValueProvider
-from apache_beam.utils.value_provider import StaticValueProvider
-from apache_beam.utils.value_provider import check_accessible
-
 
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 
@@ -57,8 +53,7 @@ class FileBasedSource(iobase.BoundedSource):
     """Initializes ``FileBasedSource``.
 
     Args:
-      file_pattern: the file glob to read a string or a ValueProvider
-                    (placeholder to inject a runtime value).
+      file_pattern: the file glob to read.
       min_bundle_size: minimum size of bundles that should be generated when
                        performing initial splitting on this source.
       compression_type: compression type to use
@@ -76,19 +71,15 @@ class FileBasedSource(iobase.BoundedSource):
                 creation time.
     Raises:
       TypeError: when compression_type is not valid or if file_pattern is not a
-                 string or a ValueProvider.
+                 string.
       ValueError: when compression and splittable files are specified.
       IOError: when the file pattern specified yields an empty result.
     """
+    if not isinstance(file_pattern, basestring):
+      raise TypeError(
+          '%s: file_pattern must be a string;  got %r instead' %
+          (self.__class__.__name__, file_pattern))
 
-    if (not (isinstance(file_pattern, basestring)
-             or isinstance(file_pattern, ValueProvider))):
-      raise TypeError('%s: file_pattern must be of type string'
-                      ' or ValueProvider; got %r instead'
-                      % (self.__class__.__name__, file_pattern))
-
-    if isinstance(file_pattern, basestring):
-      file_pattern = StaticValueProvider(str, file_pattern)
     self._pattern = file_pattern
     self._concat_source = None
     self._min_bundle_size = min_bundle_size
@@ -102,24 +93,21 @@ class FileBasedSource(iobase.BoundedSource):
     else:
       # We can't split compressed files efficiently so turn off splitting.
       self._splittable = False
-    if validate and file_pattern.is_accessible():
+    if validate:
       self._validate()
 
   def display_data(self):
-    return {'file_pattern': DisplayDataItem(str(self._pattern),
+    return {'file_pattern': DisplayDataItem(self._pattern,
                                             label="File Pattern"),
             'compression': DisplayDataItem(str(self._compression_type),
                                            label='Compression Type')}
 
-  @check_accessible(['_pattern'])
   def _get_concat_source(self):
     if self._concat_source is None:
-      pattern = self._pattern.get()
-
       single_file_sources = []
-      file_names = [f for f in fileio.ChannelFactory.glob(pattern)]
+      file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
       sizes = FileBasedSource._estimate_sizes_of_files(file_names,
-                                                       pattern)
+                                                       self._pattern)
 
       # We create a reference for FileBasedSource that will be serialized along
       # with each _SingleFileSource. To prevent this FileBasedSource from having
@@ -176,16 +164,13 @@ class FileBasedSource(iobase.BoundedSource):
                                                                  file_names)
         return [file_sizes[f] for f in file_names]
 
-  @check_accessible(['_pattern'])
   def _validate(self):
     """Validate if there are actual files in the specified glob pattern
     """
-    pattern = self._pattern.get()
-
     # Limit the responses as we only want to check if something exists
-    if len(fileio.ChannelFactory.glob(pattern, limit=1)) <= 0:
+    if len(fileio.ChannelFactory.glob(self._pattern, limit=1)) <= 0:
       raise IOError(
-          'No files found based on the file pattern %s' % pattern)
+          'No files found based on the file pattern %s' % self._pattern)
 
   def split(
       self, desired_bundle_size=None, start_position=None, stop_position=None):
@@ -194,11 +179,8 @@ class FileBasedSource(iobase.BoundedSource):
         start_position=start_position,
         stop_position=stop_position)
 
-  @check_accessible(['_pattern'])
   def estimate_size(self):
-    pattern = self._pattern.get()
-    file_names = [f for f in fileio.ChannelFactory.glob(pattern)]
-
+    file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
     # We're reading very few files so we can pass names file names to
     # _estimate_sizes_of_files without pattern as otherwise we'll try to do
     # optimization based on the pattern and might end up reading much more
@@ -239,7 +221,7 @@ class FileBasedSource(iobase.BoundedSource):
                             defined by a given ``RangeTracker``.
 
     Returns:
-      an iterator that gives the records read from the given file.
+      a iterator that gives the records read from the given file.
     """
     raise NotImplementedError
 

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/io/filebasedsource_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource_test.py b/sdks/python/apache_beam/io/filebasedsource_test.py
index 51097de..7481c4c 100644
--- a/sdks/python/apache_beam/io/filebasedsource_test.py
+++ b/sdks/python/apache_beam/io/filebasedsource_test.py
@@ -43,8 +43,6 @@ from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
-from apache_beam.utils.value_provider import StaticValueProvider
-from apache_beam.utils.value_provider import RuntimeValueProvider
 
 
 class LineSource(FileBasedSource):
@@ -223,28 +221,6 @@ class TestFileBasedSource(unittest.TestCase):
     # environments with limited amount of resources.
     filebasedsource.MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 2
 
-  def test_string_or_value_provider_only(self):
-    str_file_pattern = tempfile.NamedTemporaryFile(delete=False).name
-    self.assertEqual(str_file_pattern,
-                     FileBasedSource(str_file_pattern)._pattern.value)
-
-    static_vp_file_pattern = StaticValueProvider(value_type=str,
-                                                 value=str_file_pattern)
-    self.assertEqual(static_vp_file_pattern,
-                     FileBasedSource(static_vp_file_pattern)._pattern)
-
-    runtime_vp_file_pattern = RuntimeValueProvider(
-        pipeline_options_subclass=object,
-        option_name='blah',
-        value_type=str,
-        default_value=str_file_pattern)
-    self.assertEqual(runtime_vp_file_pattern,
-                     FileBasedSource(runtime_vp_file_pattern)._pattern)
-
-    invalid_file_pattern = 123
-    with self.assertRaises(TypeError):
-      FileBasedSource(invalid_file_pattern)
-
   def test_validation_file_exists(self):
     file_name, _ = write_data(10)
     LineSource(file_name)
@@ -611,9 +587,7 @@ class TestSingleFileSource(unittest.TestCase):
     dd = DisplayData.create_from(fbs)
     expected_items = [
         DisplayDataItemMatcher('compression', 'auto'),
-        DisplayDataItemMatcher(
-            'file_pattern',
-            file_name)]
+        DisplayDataItemMatcher('file_pattern', file_name)]
     hc.assert_that(dd.items,
                    hc.contains_inanyorder(*expected_items))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 243cda6..49a2082 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -14,7 +14,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 #
-
 """File-based sources and sinks."""
 
 from __future__ import absolute_import
@@ -32,9 +31,6 @@ import zlib
 from apache_beam.internal import util
 from apache_beam.io import iobase
 from apache_beam.transforms.display import DisplayDataItem
-from apache_beam.utils.value_provider import ValueProvider
-from apache_beam.utils.value_provider import StaticValueProvider
-from apache_beam.utils.value_provider import check_accessible
 
 # TODO(sourabhbajaj): Fix the constant values after the new IO factory
 # Current constants are copy pasted from gcsio.py till we fix this.
@@ -548,30 +544,25 @@ class FileSink(iobase.Sink):
                compression_type=CompressionTypes.AUTO):
     """
      Raises:
-      TypeError: if file path parameters are not a string or ValueProvider,
-                 or if compression_type is not member of CompressionTypes.
+      TypeError: if file path parameters are not a string or if compression_type
+        is not member of CompressionTypes.
       ValueError: if shard_name_template is not of expected format.
     """
-    if not (isinstance(file_path_prefix, basestring)
-            or isinstance(file_path_prefix, ValueProvider)):
-      raise TypeError('file_path_prefix must be a string or ValueProvider;'
-                      'got %r instead' % file_path_prefix)
-    if not (isinstance(file_name_suffix, basestring)
-            or isinstance(file_name_suffix, ValueProvider)):
-      raise TypeError('file_name_suffix must be a string or ValueProvider;'
-                      'got %r instead' % file_name_suffix)
+    if not isinstance(file_path_prefix, basestring):
+      raise TypeError('file_path_prefix must be a string; got %r instead' %
+                      file_path_prefix)
+    if not isinstance(file_name_suffix, basestring):
+      raise TypeError('file_name_suffix must be a string; got %r instead' %
+                      file_name_suffix)
 
     if not CompressionTypes.is_valid_compression_type(compression_type):
       raise TypeError('compression_type must be CompressionType object but '
                       'was %s' % type(compression_type))
+
     if shard_name_template is None:
       shard_name_template = DEFAULT_SHARD_NAME_TEMPLATE
     elif shard_name_template is '':
       num_shards = 1
-    if isinstance(file_path_prefix, basestring):
-      file_path_prefix = StaticValueProvider(str, file_path_prefix)
-    if isinstance(file_name_suffix, basestring):
-      file_name_suffix = StaticValueProvider(str, file_name_suffix)
     self.file_path_prefix = file_path_prefix
     self.file_name_suffix = file_name_suffix
     self.num_shards = num_shards
@@ -627,31 +618,22 @@ class FileSink(iobase.Sink):
     if file_handle is not None:
       file_handle.close()
 
-  @check_accessible(['file_path_prefix', 'file_name_suffix'])
   def initialize_write(self):
-    file_path_prefix = self.file_path_prefix.get()
-    file_name_suffix = self.file_name_suffix.get()
-    tmp_dir = file_path_prefix + file_name_suffix + time.strftime(
+    tmp_dir = self.file_path_prefix + self.file_name_suffix + time.strftime(
         '-temp-%Y-%m-%d_%H-%M-%S')
     ChannelFactory().mkdir(tmp_dir)
     return tmp_dir
 
-  @check_accessible(['file_path_prefix', 'file_name_suffix'])
   def open_writer(self, init_result, uid):
     # A proper suffix is needed for AUTO compression detection.
     # We also ensure there will be no collisions with uid and a
     # (possibly unsharded) file_path_prefix and a (possibly empty)
     # file_name_suffix.
-    file_path_prefix = self.file_path_prefix.get()
-    file_name_suffix = self.file_name_suffix.get()
     suffix = (
-        '.' + os.path.basename(file_path_prefix) + file_name_suffix)
+        '.' + os.path.basename(self.file_path_prefix) + self.file_name_suffix)
     return FileSinkWriter(self, os.path.join(init_result, uid) + suffix)
 
-  @check_accessible(['file_path_prefix', 'file_name_suffix'])
   def finalize_write(self, init_result, writer_results):
-    file_path_prefix = self.file_path_prefix.get()
-    file_name_suffix = self.file_name_suffix.get()
     writer_results = sorted(writer_results)
     num_shards = len(writer_results)
     min_threads = min(num_shards, FileSink._MAX_RENAME_THREADS)
@@ -660,8 +642,8 @@ class FileSink(iobase.Sink):
     rename_ops = []
     for shard_num, shard in enumerate(writer_results):
       final_name = ''.join([
-          file_path_prefix, self.shard_name_format % dict(
-              shard_num=shard_num, num_shards=num_shards), file_name_suffix
+          self.file_path_prefix, self.shard_name_format % dict(
+              shard_num=shard_num, num_shards=num_shards), self.file_name_suffix
       ])
       rename_ops.append((shard, final_name))
 

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 4ca0521..a963c67 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -35,8 +35,6 @@ from apache_beam.test_pipeline import TestPipeline
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 
-from apache_beam.utils.value_provider import StaticValueProvider
-
 
 class TestChannelFactory(unittest.TestCase):
 
@@ -170,7 +168,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
   def test_file_sink_writing(self):
     temp_path = os.path.join(self._new_tempdir(), 'filesink')
     sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+        temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
 
     # Manually invoke the generic Sink API.
     init_token = sink.initialize_write()
@@ -191,8 +189,8 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
     res = list(sink.finalize_write(init_token, [res1, res2]))
 
     # Check the results.
-    shard1 = temp_path + '-00000-of-00002.output'
-    shard2 = temp_path + '-00001-of-00002.output'
+    shard1 = temp_path + '-00000-of-00002.foo'
+    shard2 = temp_path + '-00001-of-00002.foo'
     self.assertEqual(res, [shard1, shard2])
     self.assertEqual(open(shard1).read(), '[start][a][b][end]')
     self.assertEqual(open(shard2).read(), '[start][x][y][z][end]')
@@ -203,48 +201,33 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
   def test_file_sink_display_data(self):
     temp_path = os.path.join(self._new_tempdir(), 'display')
     sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+        temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
     dd = DisplayData.create_from(sink)
     expected_items = [
         DisplayDataItemMatcher(
             'compression', 'auto'),
         DisplayDataItemMatcher(
             'file_pattern',
-            '{}{}'.format(
-                temp_path,
-                '-%(shard_num)05d-of-%(num_shards)05d.output'))]
+            '{}{}'.format(temp_path,
+                          '-%(shard_num)05d-of-%(num_shards)05d.foo'))]
+
     hc.assert_that(dd.items, hc.contains_inanyorder(*expected_items))
 
   def test_empty_write(self):
     temp_path = tempfile.NamedTemporaryFile().name
     sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder()
-    )
-    p = TestPipeline()
-    p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
-    p.run()
-    self.assertEqual(
-        open(temp_path + '-00000-of-00001.output').read(), '[start][end]')
-
-  def test_static_value_provider_empty_write(self):
-    temp_path = StaticValueProvider(value_type=str,
-                                    value=tempfile.NamedTemporaryFile().name)
-    sink = MyFileSink(
-        temp_path,
-        file_name_suffix=StaticValueProvider(value_type=str, value='.output'),
-        coder=coders.ToStringCoder()
-    )
+        temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
     p = TestPipeline()
     p | beam.Create([]) | beam.io.Write(sink)  # pylint: disable=expression-not-assigned
     p.run()
     self.assertEqual(
-        open(temp_path.get() + '-00000-of-00001.output').read(), '[start][end]')
+        open(temp_path + '-00000-of-00001.foo').read(), '[start][end]')
 
   def test_fixed_shard_write(self):
     temp_path = os.path.join(self._new_tempdir(), 'empty')
     sink = MyFileSink(
         temp_path,
-        file_name_suffix='.output',
+        file_name_suffix='.foo',
         num_shards=3,
         shard_name_template='_NN_SSS_',
         coder=coders.ToStringCoder())
@@ -254,7 +237,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
     p.run()
 
     concat = ''.join(
-        open(temp_path + '_03_%03d_.output' % shard_num).read()
+        open(temp_path + '_03_%03d_.foo' % shard_num).read()
         for shard_num in range(3))
     self.assertTrue('][a][' in concat, concat)
     self.assertTrue('][b][' in concat, concat)
@@ -262,7 +245,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
   def test_file_sink_multi_shards(self):
     temp_path = os.path.join(self._new_tempdir(), 'multishard')
     sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+        temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
 
     # Manually invoke the generic Sink API.
     init_token = sink.initialize_write()
@@ -285,7 +268,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
 
     res = sorted(res_second)
     for i in range(num_shards):
-      shard_name = '%s-%05d-of-%05d.output' % (temp_path, i, num_shards)
+      shard_name = '%s-%05d-of-%05d.foo' % (temp_path, i, num_shards)
       uuid = 'uuid-%05d' % i
       self.assertEqual(res[i], shard_name)
       self.assertEqual(
@@ -297,7 +280,7 @@ class TestFileSink(_TestCaseWithTempDirCleanUp):
   def test_file_sink_io_error(self):
     temp_path = os.path.join(self._new_tempdir(), 'ioerror')
     sink = MyFileSink(
-        temp_path, file_name_suffix='.output', coder=coders.ToStringCoder())
+        temp_path, file_name_suffix='.foo', coder=coders.ToStringCoder())
 
     # Manually invoke the generic Sink API.
     init_token = sink.initialize_write()

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/transforms/display.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display.py b/sdks/python/apache_beam/transforms/display.py
index f2ce0fc..2ced1af 100644
--- a/sdks/python/apache_beam/transforms/display.py
+++ b/sdks/python/apache_beam/transforms/display.py
@@ -40,7 +40,6 @@ from datetime import datetime, timedelta
 import inspect
 import json
 
-
 __all__ = ['HasDisplayData', 'DisplayDataItem', 'DisplayData']
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/transforms/display_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/transforms/display_test.py b/sdks/python/apache_beam/transforms/display_test.py
index 7d1130b..5e106e5 100644
--- a/sdks/python/apache_beam/transforms/display_test.py
+++ b/sdks/python/apache_beam/transforms/display_test.py
@@ -114,42 +114,6 @@ class DisplayDataTest(unittest.TestCase):
     with self.assertRaises(ValueError):
       DisplayData.create_from_options(MyDisplayComponent())
 
-  def test_value_provider_display_data(self):
-    class TestOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--int_flag',
-            type=int,
-            help='int_flag description')
-        parser.add_value_provider_argument(
-            '--str_flag',
-            type=str,
-            default='hello',
-            help='str_flag description')
-        parser.add_value_provider_argument(
-            '--float_flag',
-            type=float,
-            help='float_flag description')
-    options = TestOptions(['--int_flag', '1'])
-    items = DisplayData.create_from_options(options).items
-    expected_items = [
-        DisplayDataItemMatcher(
-            'int_flag',
-            '1'),
-        DisplayDataItemMatcher(
-            'str_flag',
-            'RuntimeValueProvider(option: str_flag,'
-            ' type: str, default_value: \'hello\')'
-        ),
-        DisplayDataItemMatcher(
-            'float_flag',
-            'RuntimeValueProvider(option: float_flag,'
-            ' type: float, default_value: None)'
-        )
-    ]
-    hc.assert_that(items, hc.contains_inanyorder(*expected_items))
-
   def test_create_list_display_data(self):
     flags = ['--extra_package', 'package1', '--extra_package', 'package2']
     pipeline_options = PipelineOptions(flags=flags)

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/utils/pipeline_options.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options.py b/sdks/python/apache_beam/utils/pipeline_options.py
index 3b5f973..d07c328 100644
--- a/sdks/python/apache_beam/utils/pipeline_options.py
+++ b/sdks/python/apache_beam/utils/pipeline_options.py
@@ -20,75 +20,6 @@
 import argparse
 
 from apache_beam.transforms.display import HasDisplayData
-from apache_beam.utils.value_provider import StaticValueProvider
-from apache_beam.utils.value_provider import RuntimeValueProvider
-from apache_beam.utils.value_provider import ValueProvider
-
-
-def _static_value_provider_of(value_type):
-  """"Helper function to plug a ValueProvider into argparse.
-
-  Args:
-    value_type: the type of the value. Since the type param of argparse's
-                add_argument will always be ValueProvider, we need to
-                preserve the type of the actual value.
-  Returns:
-    A partially constructed StaticValueProvider in the form of a function.
-
-  """
-  def _f(value):
-    _f.func_name = value_type.__name__
-    return StaticValueProvider(value_type, value)
-  return _f
-
-
-class BeamArgumentParser(argparse.ArgumentParser):
-  """An ArgumentParser that supports ValueProvider options.
-
-  Example Usage::
-
-    class TemplateUserOptions(PipelineOptions):
-      @classmethod
-
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument('--vp-arg1', default='start')
-        parser.add_value_provider_argument('--vp-arg2')
-        parser.add_argument('--non-vp-arg')
-
-  """
-  def add_value_provider_argument(self, *args, **kwargs):
-    """ValueProvider arguments can be either of type keyword or positional.
-    At runtime, even positional arguments will need to be supplied in the
-    key/value form.
-    """
-    # Extract the option name from positional argument ['pos_arg']
-    assert args != () and len(args[0]) >= 1
-    if args[0][0] != '-':
-      option_name = args[0]
-      if kwargs.get('nargs') is None:  # make them optionally templated
-        kwargs['nargs'] = '?'
-    else:
-      # or keyword arguments like [--kw_arg, -k, -w] or [--kw-arg]
-      option_name = [i.replace('--', '') for i in args if i[:2] == '--'][0]
-
-    # reassign the type to make room for using
-    # StaticValueProvider as the type for add_argument
-    value_type = kwargs.get('type') or str
-    kwargs['type'] = _static_value_provider_of(value_type)
-
-    # reassign default to default_value to make room for using
-    # RuntimeValueProvider as the default for add_argument
-    default_value = kwargs.get('default')
-    kwargs['default'] = RuntimeValueProvider(
-        pipeline_options_subclass=(self.pipeline_options_subclass
-                                   or PipelineOptions),
-        option_name=option_name,
-        value_type=value_type,
-        default_value=default_value,
-    )
-
-    # have add_argument do most of the work
-    self.add_argument(*args, **kwargs)
 
 
 class PipelineOptions(HasDisplayData):
@@ -136,13 +67,11 @@ class PipelineOptions(HasDisplayData):
     """
     self._flags = flags
     self._all_options = kwargs
-    parser = BeamArgumentParser()
-
+    parser = argparse.ArgumentParser()
     for cls in type(self).mro():
       if cls == PipelineOptions:
         break
       elif '_add_argparse_args' in cls.__dict__:
-        parser.pipeline_options_subclass = cls
         cls._add_argparse_args(parser)
     # The _visible_options attribute will contain only those options from the
     # flags (i.e., command line) that can be recognized. The _all_options
@@ -190,13 +119,13 @@ class PipelineOptions(HasDisplayData):
 
     # TODO(BEAM-1319): PipelineOption sub-classes in the main session might be
     # repeated. Pick last unique instance of each subclass to avoid conflicts.
+    parser = argparse.ArgumentParser()
     subset = {}
-    parser = BeamArgumentParser()
     for cls in PipelineOptions.__subclasses__():
       subset[str(cls)] = cls
     for cls in subset.values():
-      parser.pipeline_options_subclass = cls
       cls._add_argparse_args(parser)  # pylint: disable=protected-access
+
     known_args, _ = parser.parse_known_args(self._flags)
     result = vars(known_args)
 
@@ -204,9 +133,7 @@ class PipelineOptions(HasDisplayData):
     for k in result.keys():
       if k in self._all_options:
         result[k] = self._all_options[k]
-      if (drop_default and
-          parser.get_default(k) == result[k] and
-          not isinstance(parser.get_default(k), ValueProvider)):
+      if drop_default and parser.get_default(k) == result[k]:
         del result[k]
 
     return result

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/utils/pipeline_options_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/pipeline_options_test.py b/sdks/python/apache_beam/utils/pipeline_options_test.py
index f1df6a1..507a827 100644
--- a/sdks/python/apache_beam/utils/pipeline_options_test.py
+++ b/sdks/python/apache_beam/utils/pipeline_options_test.py
@@ -24,8 +24,6 @@ import hamcrest as hc
 from apache_beam.transforms.display import DisplayData
 from apache_beam.transforms.display_test import DisplayDataItemMatcher
 from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.value_provider import StaticValueProvider
-from apache_beam.utils.value_provider import RuntimeValueProvider
 
 
 class PipelineOptionsTest(unittest.TestCase):
@@ -133,7 +131,7 @@ class PipelineOptionsTest(unittest.TestCase):
 
     options.view_as(PipelineOptionsTest.MockOptions).mock_flag = True
     self.assertEqual(options.get_all_options()['num_workers'], 5)
-    self.assertTrue(options.get_all_options()['mock_flag'])
+    self.assertEqual(options.get_all_options()['mock_flag'], True)
 
   def test_experiments(self):
     options = PipelineOptions(['--experiment', 'abc', '--experiment', 'def'])
@@ -187,51 +185,7 @@ class PipelineOptionsTest(unittest.TestCase):
         parser.add_argument('--redefined_flag', action='store_true')
 
     options = PipelineOptions(['--redefined_flag'])
-    self.assertTrue(options.get_all_options()['redefined_flag'])
-
-  def test_value_provider_options(self):
-    class UserOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--vp_arg',
-            help='This flag is a value provider')
-
-        parser.add_value_provider_argument(
-            '--vp_arg2',
-            default=1,
-            type=int)
-
-        parser.add_argument(
-            '--non_vp_arg',
-            default=1,
-            type=int
-        )
-
-    # Provide values: if not provided, the option becomes of the type runtime vp
-    options = UserOptions(['--vp_arg', 'hello'])
-    self.assertIsInstance(options.vp_arg, StaticValueProvider)
-    self.assertIsInstance(options.vp_arg2, RuntimeValueProvider)
-    self.assertIsInstance(options.non_vp_arg, int)
-
-    # Values can be overwritten
-    options = UserOptions(vp_arg=5,
-                          vp_arg2=StaticValueProvider(value_type=str,
-                                                      value='bye'),
-                          non_vp_arg=RuntimeValueProvider(
-                              pipeline_options_subclass=UserOptions,
-                              option_name='foo',
-                              value_type=int,
-                              default_value=10))
-    self.assertEqual(options.vp_arg, 5)
-    self.assertTrue(options.vp_arg2.is_accessible(),
-                    '%s is not accessible' % options.vp_arg2)
-    self.assertEqual(options.vp_arg2.get(), 'bye')
-    self.assertFalse(options.non_vp_arg.is_accessible())
-
-    with self.assertRaises(RuntimeError):
-      options.non_vp_arg.get()
-
+    self.assertEqual(options.get_all_options()['redefined_flag'], True)
 
 if __name__ == '__main__':
   logging.getLogger().setLevel(logging.INFO)

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/utils/value_provider.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/value_provider.py b/sdks/python/apache_beam/utils/value_provider.py
deleted file mode 100644
index 7ae8998..0000000
--- a/sdks/python/apache_beam/utils/value_provider.py
+++ /dev/null
@@ -1,108 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""A ValueProvider class to implement templates with both statically
-and dynamically provided values.
-"""
-
-from functools import wraps
-
-
-class ValueProvider(object):
-  def is_accessible(self):
-    raise NotImplementedError(
-        'ValueProvider.is_accessible implemented in derived classes'
-    )
-
-  def get(self):
-    raise NotImplementedError(
-        'ValueProvider.get implemented in derived classes'
-    )
-
-
-class StaticValueProvider(ValueProvider):
-  def __init__(self, value_type, value):
-    self.value_type = value_type
-    self.value = value_type(value)
-
-  def is_accessible(self):
-    return True
-
-  def get(self):
-    return self.value
-
-  def __str__(self):
-    return str(self.value)
-
-
-class RuntimeValueProvider(ValueProvider):
-  pipeline_options_dict = None
-
-  def __init__(self, pipeline_options_subclass, option_name,
-               value_type, default_value):
-    self.pipeline_options_subclass = pipeline_options_subclass
-    self.option_name = option_name
-    self.default_value = default_value
-    self.value_type = value_type
-
-  def is_accessible(self):
-    return RuntimeValueProvider.pipeline_options_dict is not None
-
-  def get(self):
-    pipeline_options_dict = RuntimeValueProvider.pipeline_options_dict
-    if pipeline_options_dict is None:
-      raise RuntimeError('%s.get() not called from a runtime context' %self)
-    pipeline_option = (
-        self.pipeline_options_subclass.from_dictionary(pipeline_options_dict)
-        ._visible_options
-        .__dict__
-        .get(self.option_name)
-    )
-    value = (
-        pipeline_option.get()
-        if isinstance(pipeline_option, StaticValueProvider)
-        else self.default_value
-    )
-    return value
-
-  @classmethod
-  def set_runtime_options(cls, pipeline_options):
-    assert RuntimeValueProvider.pipeline_options_dict is None
-    RuntimeValueProvider.pipeline_options_dict = pipeline_options
-
-  def __str__(self):
-    return '%s(option: %s, type: %s, default_value: %s)' % (
-        self.__class__.__name__,
-        self.option_name,
-        self.value_type.__name__,
-        repr(self.default_value)
-    )
-
-
-def check_accessible(value_provider_list):
-  """Check accessibility of a list of ValueProvider objects."""
-  assert isinstance(value_provider_list, list)
-
-  def _check_accessible(fnc):
-    @wraps(fnc)
-    def _f(self, *args, **kwargs):
-      for obj in [getattr(self, vp) for vp in value_provider_list]:
-        if not obj.is_accessible():
-          raise RuntimeError('%s not accessible' % obj)
-      return fnc(self, *args, **kwargs)
-    return _f
-  return _check_accessible

http://git-wip-us.apache.org/repos/asf/beam/blob/02454134/sdks/python/apache_beam/utils/value_provider_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/utils/value_provider_test.py b/sdks/python/apache_beam/utils/value_provider_test.py
deleted file mode 100644
index 0bc3b1d..0000000
--- a/sdks/python/apache_beam/utils/value_provider_test.py
+++ /dev/null
@@ -1,146 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-"""Unit tests for the ValueProvider class."""
-
-import unittest
-
-from apache_beam.utils.pipeline_options import PipelineOptions
-from apache_beam.utils.value_provider import RuntimeValueProvider
-from apache_beam.utils.value_provider import StaticValueProvider
-
-
-class ValueProviderTests(unittest.TestCase):
-  def test_static_value_provider_keyword_argument(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--vp_arg',
-            help='This keyword argument is a value provider',
-            default='some value')
-    options = UserDefinedOptions(['--vp_arg', 'abc'])
-    self.assertTrue(isinstance(options.vp_arg, StaticValueProvider))
-    self.assertTrue(options.vp_arg.is_accessible())
-    self.assertEqual(options.vp_arg.get(), 'abc')
-
-  def test_runtime_value_provider_keyword_arguent(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--vp_arg',
-            help='This keyword argument is a value provider')
-    options = UserDefinedOptions()
-    self.assertTrue(isinstance(options.vp_arg, RuntimeValueProvider))
-    self.assertFalse(options.vp_arg.is_accessible())
-    with self.assertRaises(RuntimeError):
-      options.vp_arg.get()
-
-  def test_static_value_provider_positional_argument(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            'vp_pos_arg',
-            help='This positional argument is a value provider',
-            default='some value')
-    options = UserDefinedOptions(['abc'])
-    self.assertTrue(isinstance(options.vp_pos_arg, StaticValueProvider))
-    self.assertTrue(options.vp_pos_arg.is_accessible())
-    self.assertEqual(options.vp_pos_arg.get(), 'abc')
-
-  def test_runtime_value_provider_positional_argument(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            'vp_pos_arg',
-            help='This positional argument is a value provider')
-    options = UserDefinedOptions([])
-    self.assertTrue(isinstance(options.vp_pos_arg, RuntimeValueProvider))
-    self.assertFalse(options.vp_pos_arg.is_accessible())
-    with self.assertRaises(RuntimeError):
-      options.vp_pos_arg.get()
-
-  def test_static_value_provider_type_cast(self):
-    class UserDefinedOptions(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--vp_arg',
-            type=int,
-            help='This flag is a value provider')
-
-    options = UserDefinedOptions(['--vp_arg', '123'])
-    self.assertTrue(isinstance(options.vp_arg, StaticValueProvider))
-    self.assertTrue(options.vp_arg.is_accessible())
-    self.assertEqual(options.vp_arg.get(), 123)
-
-  def test_set_runtime_option(self):
-    # define ValueProvider ptions, with and without default values
-    class UserDefinedOptions1(PipelineOptions):
-      @classmethod
-      def _add_argparse_args(cls, parser):
-        parser.add_value_provider_argument(
-            '--vp_arg',
-            help='This keyword argument is a value provider')   # set at runtime
-
-        parser.add_value_provider_argument(         # not set, had default int
-            '-v', '--vp_arg2',                      # with short form
-            default=123,
-            type=int)
-
-        parser.add_value_provider_argument(         # not set, had default str
-            '--vp-arg3',                            # with dash in name
-            default='123',
-            type=str)
-
-        parser.add_value_provider_argument(         # not set and no default
-            '--vp_arg4',
-            type=float)
-
-        parser.add_value_provider_argument(         # positional argument set
-            'vp_pos_arg',                           # default & runtime ignored
-            help='This positional argument is a value provider',
-            type=float,
-            default=5.4)
-
-    # provide values at graph-construction time
-    # (options not provided here become of the type RuntimeValueProvider)
-    options = UserDefinedOptions1(['1.2'])
-    self.assertFalse(options.vp_arg.is_accessible())
-    self.assertFalse(options.vp_arg2.is_accessible())
-    self.assertFalse(options.vp_arg3.is_accessible())
-    self.assertFalse(options.vp_arg4.is_accessible())
-    self.assertTrue(options.vp_pos_arg.is_accessible())
-
-    # provide values at job-execution time
-    # (options not provided here will use their default, if they have one)
-    RuntimeValueProvider.set_runtime_options({'vp_arg': 'abc',
-                                              'vp_pos_arg':'3.2'})
-    # RuntimeValueProvider.set_runtime_options({'vp_arg': 'abc'})
-    self.assertTrue(options.vp_arg.is_accessible())
-    self.assertEqual(options.vp_arg.get(), 'abc')
-    self.assertTrue(options.vp_arg2.is_accessible())
-    self.assertEqual(options.vp_arg2.get(), 123)
-    self.assertTrue(options.vp_arg3.is_accessible())
-    self.assertEqual(options.vp_arg3.get(), '123')
-    self.assertTrue(options.vp_arg4.is_accessible())
-    self.assertIsNone(options.vp_arg4.get())
-    self.assertTrue(options.vp_pos_arg.is_accessible())
-    self.assertEqual(options.vp_pos_arg.get(), 1.2)


Mime
View raw message