beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [1/2] beam git commit: Updates snippets to use Beam text source and sink.
Date Thu, 12 Jan 2017 20:15:31 GMT
Repository: beam
Updated Branches:
  refs/heads/python-sdk 86d420376 -> 4ba0b60a8


Updates snippets to use Beam text source and sink.

Removes the dependency snippets_test has on dataflow native text sink.


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/30a01845
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/30a01845
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/30a01845

Branch: refs/heads/python-sdk
Commit: 30a018458f51a70c0e0d6e5431b219157af8a350
Parents: 86d4203
Author: Chamikara Jayalath <chamikara@google.com>
Authored: Wed Jan 11 17:50:02 2017 -0800
Committer: Chamikara Jayalath <chamikara@google.com>
Committed: Wed Jan 11 17:50:18 2017 -0800

----------------------------------------------------------------------
 .../examples/cookbook/custom_ptransform.py      |   4 +-
 .../apache_beam/examples/snippets/snippets.py   | 100 +++++++++----------
 .../examples/snippets/snippets_test.py          |  96 ++++++++++++++++--
 3 files changed, 136 insertions(+), 64 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/30a01845/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
index ef6bc5a..cfbb99d 100644
--- a/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
+++ b/sdks/python/apache_beam/examples/cookbook/custom_ptransform.py
@@ -48,9 +48,9 @@ def run_count1(known_args, options):
   """Runs the first example pipeline."""
   logging.info('Running first pipeline')
   p = beam.Pipeline(options=options)
-  (p | beam.io.Read(beam.io.TextFileSource(known_args.input))
+  (p | beam.io.ReadFromText(known_args.input)
    | Count1()
-   | beam.io.Write(beam.io.TextFileSink(known_args.output)))
+   | beam.io.WriteToText(known_args.output))
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/30a01845/sdks/python/apache_beam/examples/snippets/snippets.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets.py b/sdks/python/apache_beam/examples/snippets/snippets.py
index 0d55125..e467353 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets.py
@@ -48,11 +48,15 @@ class SnippetUtils(object):
   from apache_beam.pipeline import PipelineVisitor
 
   class RenameFiles(PipelineVisitor):
-    """RenameFiles will rewire source and sink for unit testing.
+    """RenameFiles will rewire read/write paths for unit testing.
 
-    RenameFiles will rewire the GCS files specified in the source and
-    sink in the snippet pipeline to local files so the pipeline can be run as a
-    unit test. This is as close as we can get to have code snippets that are
+    RenameFiles will replace the GCS files specified in the read and
+    write transforms to local files so the pipeline can be run as a
+    unit test. This assumes that read and write transforms defined in snippets
+    have already been replaced by transforms 'DummyReadForTesting' and
+    'DummyReadForTesting' (see snippets_test.py).
+
+    This is as close as we can get to have code snippets that are
     executed and are also ready to presented in webdocs.
     """
 
@@ -60,14 +64,10 @@ class SnippetUtils(object):
       self.renames = renames
 
     def visit_transform(self, transform_node):
-      if hasattr(transform_node.transform, 'source'):
-        source = transform_node.transform.source
-        source.file_path = self.renames['read']
-        source.is_gcs_source = False
-      elif hasattr(transform_node.transform, 'sink'):
-        sink = transform_node.transform.sink
-        sink.file_path = self.renames['write']
-        sink.is_gcs_sink = False
+      if transform_node.full_label.find('DummyReadForTesting') >= 0:
+        transform_node.transform.fn.file_to_read = self.renames['read']
+      elif transform_node.full_label.find('DummyWriteForTesting') >= 0:
+        transform_node.transform.fn.file_to_write = self.renames['write']
 
 
 def construct_pipeline(renames):
@@ -94,8 +94,7 @@ def construct_pipeline(renames):
   # [END pipelines_constructing_creating]
 
   # [START pipelines_constructing_reading]
-  lines = p | beam.io.Read('ReadMyFile',
-                           beam.io.TextFileSource('gs://some/inputData.txt'))
+  lines = p | 'ReadMyFile' >> beam.io.ReadFromText('gs://some/inputData.txt')
   # [END pipelines_constructing_reading]
 
   # [START pipelines_constructing_applying]
@@ -105,8 +104,8 @@ def construct_pipeline(renames):
 
   # [START pipelines_constructing_writing]
   filtered_words = reversed_words | 'FilterWords' >> beam.Filter(filter_words)
-  filtered_words | 'WriteMyFile' >> beam.io.Write(
-      beam.io.TextFileSink('gs://some/outputData.txt'))
+  filtered_words | 'WriteMyFile' >> beam.io.WriteToText(
+      'gs://some/outputData.txt')
   # [END pipelines_constructing_writing]
 
   p.visit(SnippetUtils.RenameFiles(renames))
@@ -147,10 +146,11 @@ def model_pipelines(argv):
   p = beam.Pipeline(options=pipeline_options)
 
   (p
-   | beam.io.Read(beam.io.TextFileSource(my_options.input))
+   | beam.io.ReadFromText(my_options.input)
    | beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
-   | beam.Map(lambda x: (x, 1)) | beam.combiners.Count.PerKey()
-   | beam.io.Write(beam.io.TextFileSink(my_options.output)))
+   | beam.Map(lambda x: (x, 1))
+   | beam.combiners.Count.PerKey()
+   | beam.io.WriteToText(my_options.output))
 
   p.run()
   # [END model_pipelines]
@@ -184,7 +184,7 @@ def model_pcollection(argv):
        'Whether \'tis nobler in the mind to suffer ',
        'The slings and arrows of outrageous fortune, ',
        'Or to take arms against a sea of troubles, '])
-   | beam.io.Write(beam.io.TextFileSink(my_options.output)))
+   | beam.io.WriteToText(my_options.output))
 
   p.run()
   # [END model_pcollection]
@@ -241,8 +241,8 @@ def pipeline_options_remote(argv):
   options.view_as(StandardOptions).runner = 'DirectRunner'
   p = Pipeline(options=options)
 
-  lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
-  lines | beam.io.Write(beam.io.TextFileSink(my_output))
+  lines = p | beam.io.ReadFromText(my_input)
+  lines | beam.io.WriteToText(my_output)
 
   p.run()
 
@@ -282,8 +282,8 @@ def pipeline_options_local(argv):
   p = Pipeline(options=options)
   # [END pipeline_options_local]
 
-  lines = p | beam.io.Read(beam.io.TextFileSource(my_input))
-  lines | beam.io.Write(beam.io.TextFileSink(my_output))
+  lines = p | beam.io.ReadFromText(my_input)
+  lines | beam.io.WriteToText(my_output)
   p.run()
 
 
@@ -304,9 +304,8 @@ def pipeline_options_command_line(argv):
 
   # Create the Pipeline with remaining arguments.
   p = beam.Pipeline(argv=pipeline_args)
-  lines = p | beam.io.Read('ReadFromText',
-                           beam.io.TextFileSource(known_args.input))
-  lines | beam.io.Write(beam.io.TextFileSink(known_args.output))
+  lines = p | 'ReadFromText' >> beam.io.ReadFromText(known_args.input)
+  lines | 'WriteToText' >> beam.io.WriteToText(known_args.output)
   # [END pipeline_options_command_line]
 
   p.run()
@@ -344,7 +343,7 @@ def pipeline_logging(lines, output):
   (p
    | beam.Create(lines)
    | beam.ParDo(ExtractWordsFn())
-   | beam.io.Write(beam.io.TextFileSink(output)))
+   | beam.io.WriteToText(output))
 
   p.run()
 
@@ -404,11 +403,11 @@ def pipeline_monitoring(renames):
   # [START pipeline_monitoring_execution]
   (p
    # Read the lines of the input text.
-   | 'ReadLines' >> beam.io.Read(beam.io.TextFileSource(options.input))
+   | 'ReadLines' >> beam.io.ReadFromText(options.input)
    # Count the words.
    | CountWords()
    # Write the formatted word counts to output.
-   | 'WriteCounts' >> beam.io.Write(beam.io.TextFileSink(options.output)))
+   | 'WriteCounts' >> beam.io.WriteToText(options.output))
   # [END pipeline_monitoring_execution]
 
   p.visit(SnippetUtils.RenameFiles(renames))
@@ -448,8 +447,8 @@ def examples_wordcount_minimal(renames):
 
   (
       # [START examples_wordcount_minimal_read]
-      p | beam.io.Read(beam.io.TextFileSource(
-          'gs://dataflow-samples/shakespeare/kinglear.txt'))
+      p | beam.io.ReadFromText(
+          'gs://dataflow-samples/shakespeare/kinglear.txt')
       # [END examples_wordcount_minimal_read]
 
       # [START examples_wordcount_minimal_pardo]
@@ -465,7 +464,7 @@ def examples_wordcount_minimal(renames):
       # [END examples_wordcount_minimal_map]
 
       # [START examples_wordcount_minimal_write]
-      | beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+      | beam.io.WriteToText('gs://my-bucket/counts.txt')
       # [END examples_wordcount_minimal_write]
   )
 
@@ -502,8 +501,8 @@ def examples_wordcount_wordcount(renames):
   p = beam.Pipeline(options=options)
   # [END examples_wordcount_wordcount_options]
 
-  lines = p | beam.io.Read(beam.io.TextFileSource(
-      'gs://dataflow-samples/shakespeare/kinglear.txt'))
+  lines = p | beam.io.ReadFromText(
+      'gs://dataflow-samples/shakespeare/kinglear.txt')
 
   # [START examples_wordcount_wordcount_composite]
   class CountWords(beam.PTransform):
@@ -530,7 +529,7 @@ def examples_wordcount_wordcount(renames):
   formatted = counts | beam.ParDo(FormatAsTextFn())
   # [END examples_wordcount_wordcount_dofn]
 
-  formatted |  beam.io.Write(beam.io.TextFileSink('gs://my-bucket/counts.txt'))
+  formatted |  beam.io.WriteToText('gs://my-bucket/counts.txt')
   p.visit(SnippetUtils.RenameFiles(renames))
   p.run()
 
@@ -588,8 +587,8 @@ def examples_wordcount_debugging(renames):
   p = beam.Pipeline(options=PipelineOptions())
   filtered_words = (
       p
-      | beam.io.Read(beam.io.TextFileSource(
-          'gs://dataflow-samples/shakespeare/kinglear.txt'))
+      | beam.io.ReadFromText(
+          'gs://dataflow-samples/shakespeare/kinglear.txt')
       | 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
       | beam.combiners.Count.PerElement()
       | 'FilterText' >> beam.ParDo(FilterTextFn('Flourish|stomach')))
@@ -601,8 +600,7 @@ def examples_wordcount_debugging(renames):
 
   output = (filtered_words
             | 'format' >> beam.Map(lambda (word, c): '%s: %s' % (word, c))
-            | beam.io.Write(
-                'write', beam.io.TextFileSink('gs://my-bucket/counts.txt')))
+            | 'Write' >> beam.io.WriteToText('gs://my-bucket/counts.txt'))
 
   p.visit(SnippetUtils.RenameFiles(renames))
   p.run()
@@ -872,18 +870,16 @@ def model_textio(renames):
   # [START model_textio_read]
   p = beam.Pipeline(options=PipelineOptions())
   # [START model_pipelineio_read]
-  lines = p | beam.io.Read(
-      'ReadFromText',
-      beam.io.TextFileSource('gs://my_bucket/path/to/input-*.csv'))
+  lines = p | 'ReadFromText' >> beam.io.ReadFromText(
+      'gs://my_bucket/path/to/input-*.csv')
   # [END model_pipelineio_read]
   # [END model_textio_read]
 
   # [START model_textio_write]
   filtered_words = lines | 'FilterWords' >> beam.FlatMap(filter_words)
   # [START model_pipelineio_write]
-  filtered_words | beam.io.Write(
-      'WriteToText', beam.io.TextFileSink('gs://my_bucket/path/to/numbers',
-                                          file_name_suffix='.csv'))
+  filtered_words | 'WriteToText' >> beam.io.WriteToText(
+      'gs://my_bucket/path/to/numbers', file_name_suffix='.csv')
   # [END model_pipelineio_write]
   # [END model_textio_write]
 
@@ -1014,7 +1010,7 @@ def model_composite_transform_example(contents, output_path):
   (p
    | beam.Create(contents)
    | CountWords()
-   | beam.io.Write(beam.io.TextFileSink(output_path)))
+   | beam.io.WriteToText(output_path))
   p.run()
 
 
@@ -1050,7 +1046,7 @@ def model_multiple_pcollections_flatten(contents, output_path):
       # A list of tuples can be "piped" directly into a Flatten transform.
       | beam.Flatten())
   # [END model_multiple_pcollections_flatten]
-  merged | beam.io.Write(beam.io.TextFileSink(output_path))
+  merged | beam.io.WriteToText(output_path)
 
   p.run()
 
@@ -1083,7 +1079,7 @@ def model_multiple_pcollections_partition(contents, output_path):
 
   ([by_decile[d] for d in xrange(10) if d != 4] + [fortieth_percentile]
    | beam.Flatten()
-   | beam.io.Write(beam.io.TextFileSink(output_path)))
+   | beam.io.WriteToText(output_path))
 
   p.run()
 
@@ -1113,7 +1109,7 @@ def model_group_by_key(contents, output_path):
   # [END model_group_by_key_transform]
   (grouped_words
    | 'count words' >> beam.Map(lambda (word, counts): (word, len(counts)))
-   | beam.io.Write(beam.io.TextFileSink(output_path)))
+   | beam.io.WriteToText(output_path))
   p.run()
 
 
@@ -1151,7 +1147,7 @@ def model_co_group_by_key_tuple(email_list, phone_list, output_path):
 
   contact_lines = result | beam.Map(join_info)
   # [END model_group_by_key_cogroupbykey_tuple]
-  contact_lines | beam.io.Write(beam.io.TextFileSink(output_path))
+  contact_lines | beam.io.WriteToText(output_path)
   p.run()
 
 
@@ -1190,7 +1186,7 @@ def model_join_using_side_inputs(
   contact_lines = names | beam.core.Map(
       "CreateContacts", join_info, AsIter(emails), AsIter(phones))
   # [END model_join_using_side_inputs]
-  contact_lines | beam.io.Write(beam.io.TextFileSink(output_path))
+  contact_lines | beam.io.WriteToText(output_path)
   p.run()
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/30a01845/sdks/python/apache_beam/examples/snippets/snippets_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/examples/snippets/snippets_test.py b/sdks/python/apache_beam/examples/snippets/snippets_test.py
index 1a84a6e..a43e1e0 100644
--- a/sdks/python/apache_beam/examples/snippets/snippets_test.py
+++ b/sdks/python/apache_beam/examples/snippets/snippets_test.py
@@ -23,12 +23,12 @@ import os
 import sys
 import tempfile
 import unittest
+import uuid
 
 import apache_beam as beam
-from apache_beam import io
+from apache_beam import coders
 from apache_beam import pvalue
 from apache_beam import typehints
-from apache_beam.io import fileio
 from apache_beam.transforms.util import assert_that
 from apache_beam.transforms.util import equal_to
 from apache_beam.utils.pipeline_options import TypeOptions
@@ -36,9 +36,6 @@ from apache_beam.examples.snippets import snippets
 
 # pylint: disable=expression-not-assigned
 
-# Monky-patch to use native sink for file path re-writing.
-io.TextFileSink = fileio.NativeTextFileSink
-
 
 class ParDoTest(unittest.TestCase):
   """Tests for dataflow/model/par-do."""
@@ -106,7 +103,8 @@ class ParDoTest(unittest.TestCase):
     # pylint: disable=line-too-long
     words = ['aa', 'bbc', 'defg']
     # [START model_pardo_with_label]
-    result = words | 'CountUniqueLetters' >> beam.Map(lambda word: len(set(word)))
+    result = words | 'CountUniqueLetters' >> beam.Map(
+        lambda word: len(set(word)))
     # [END model_pardo_with_label]
 
     self.assertEqual({1, 2, 4}, set(result))
@@ -350,6 +348,80 @@ class TypeHintsTest(unittest.TestCase):
 
 
 class SnippetsTest(unittest.TestCase):
+  # Replacing text read/write transforms with dummy transforms for testing.
+  class DummyReadTransform(beam.PTransform):
+    """A transform that will replace iobase.ReadFromText.
+
+    To be used for testing.
+    """
+
+    def __init__(self, file_to_read=None):
+      self.file_to_read = file_to_read
+
+    class ReadDoFn(beam.DoFn):
+
+      def __init__(self, file_to_read):
+        self.file_to_read = file_to_read
+        self.coder = coders.StrUtf8Coder()
+
+      def process(self, context):
+        pass
+
+      def finish_bundle(self, context):
+        assert self.file_to_read
+        for file_name in glob.glob(self.file_to_read):
+          with open(file_name) as file:
+            for record in file:
+              yield self.coder.decode(record.rstrip('\n'))
+
+    def expand(self, pcoll):
+      return pcoll | beam.Create([None]) | 'DummyReadForTesting' >> beam.ParDo(
+          SnippetsTest.DummyReadTransform.ReadDoFn(self.file_to_read))
+
+  class DummyWriteTransform(beam.PTransform):
+    """A transform that will replace iobase.WriteToText.
+
+    To be used for testing.
+    """
+
+    def __init__(self, file_to_write=None, file_name_suffix=''):
+      self.file_to_write = file_to_write
+
+    class WriteDoFn(beam.DoFn):
+      def __init__(self, file_to_write):
+        self.file_to_write = file_to_write
+        self.file_obj = None
+        self.coder = coders.ToStringCoder()
+
+      def start_bundle(self, context):
+        assert self.file_to_write
+        self.file_to_write += str(uuid.uuid4())
+        self.file_obj = open(self.file_to_write, 'w')
+
+      def process(self, context):
+        assert self.file_obj
+        self.file_obj.write(self.coder.encode(context.element) + '\n')
+
+      def finish_bundle(self, context):
+        assert self.file_obj
+        self.file_obj.close()
+
+    def expand(self, pcoll):
+      return pcoll | 'DummyWriteForTesting' >> beam.ParDo(
+          SnippetsTest.DummyWriteTransform.WriteDoFn(self.file_to_write))
+
+  def setUp(self):
+    self.old_read_from_text = beam.io.ReadFromText
+    self.old_write_to_text = beam.io.WriteToText
+
+    # Monkey patching to allow testing pipelines defined in snippets.py using
+    # real data.
+    beam.io.ReadFromText = SnippetsTest.DummyReadTransform
+    beam.io.WriteToText = SnippetsTest.DummyWriteTransform
+
+  def tearDown(self):
+    beam.io.ReadFromText = self.old_read_from_text
+    beam.io.WriteToText = self.old_write_to_text
 
   def create_temp_file(self, contents=''):
     with tempfile.NamedTemporaryFile(delete=False) as f:
@@ -357,12 +429,16 @@ class SnippetsTest(unittest.TestCase):
       return f.name
 
   def get_output(self, path, sorted_output=True, suffix=''):
-    with open(path + '-00000-of-00001' + suffix) as f:
-      lines = f.readlines()
+    all_lines = []
+    for file_name in glob.glob(path + '*'):
+      with open(file_name) as f:
+        lines = f.readlines()
+        all_lines.extend([s.rstrip('\n') for s in lines])
+
     if sorted_output:
-      return sorted(s.rstrip('\n') for s in lines)
+      return sorted(s.rstrip('\n') for s in all_lines)
     else:
-      return [s.rstrip('\n') for s in lines]
+      return all_lines
 
   def test_model_pipelines(self):
     temp_path = self.create_temp_file('aa bb cc\n bb cc\n cc')


Mime
View raw message