beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From pabl...@apache.org
Subject [beam] branch master updated: FileIO reading transforms. First implementation.
Date Sat, 09 Mar 2019 00:25:47 GMT
This is an automated email from the ASF dual-hosted git repository.

pabloem pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b1aa20  FileIO reading transforms. First implementation.
     new dd83432  Merge pull request #7791 from pabloem/fileio-read
4b1aa20 is described below

commit 4b1aa20f00b0fef0e1f684e4d58705460d71c36b
Author: Pablo <pabloem@google.com>
AuthorDate: Fri Feb 8 11:48:14 2019 -0800

    FileIO reading transforms. First implementation.
---
 sdks/python/apache_beam/io/filebasedsink_test.py |  10 +-
 sdks/python/apache_beam/io/fileio.py             | 171 +++++++++++++++++
 sdks/python/apache_beam/io/fileio_test.py        | 235 +++++++++++++++++++++++
 3 files changed, 414 insertions(+), 2 deletions(-)

diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py
index 5069da5..a934381 100644
--- a/sdks/python/apache_beam/io/filebasedsink_test.py
+++ b/sdks/python/apache_beam/io/filebasedsink_test.py
@@ -65,12 +65,18 @@ class _TestCaseWithTempDirCleanUp(unittest.TestCase):
     self._tempdirs.append(result)
     return result
 
-  def _create_temp_file(self, name='', suffix=''):
+  def _create_temp_file(self, name='', suffix='', dir=None, content=None):
     if not name:
       name = tempfile.template
+    if not dir:
+      dir = self._new_tempdir()
     file_name = tempfile.NamedTemporaryFile(
         delete=False, prefix=name,
-        dir=self._new_tempdir(), suffix=suffix).name
+        dir=dir, suffix=suffix).name
+
+    if content:
+      with open(file_name, 'w') as f:
+        f.write(content)
     return file_name
 
 
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
new file mode 100644
index 0000000..65ce76b
--- /dev/null
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -0,0 +1,171 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""``PTransforms`` for manipulating files in Apache Beam.
+
+Provides reading ``PTransform``\\s, ``MatchFiles``,
+``MatchAll``, that produces a ``PCollection`` of records representing a file
+and its metadata; and ``ReadMatches``, which takes in a ``PCollection`` of file
+metadata records, and produces a ``PCollection`` of ``ReadableFile`` objects.
+These transforms currently do not support splitting by themselves.
+
+No backward compatibility guarantees. Everything in this module is experimental.
+"""
+
+from __future__ import absolute_import
+
+from past.builtins import unicode
+
+import apache_beam as beam
+from apache_beam.io import filesystem
+from apache_beam.io import filesystems
+from apache_beam.io.filesystem import BeamIOError
+from apache_beam.utils.annotations import experimental
+
+__all__ = ['EmptyMatchTreatment',
+           'MatchFiles',
+           'MatchAll',
+           'ReadableFile',
+           'ReadMatches']
+
+
+class EmptyMatchTreatment(object):
+  """How to treat empty matches in ``MatchAll`` and ``MatchFiles`` transforms.
+
+  If empty matches are disallowed, an error will be thrown if a pattern does not
+  match any files."""
+
+  ALLOW = 'ALLOW'
+  DISALLOW = 'DISALLOW'
+  ALLOW_IF_WILDCARD = 'ALLOW_IF_WILDCARD'
+
+  @staticmethod
+  def allow_empty_match(pattern, setting):
+    if setting == EmptyMatchTreatment.ALLOW:
+      return True
+    elif setting == EmptyMatchTreatment.ALLOW_IF_WILDCARD and '*' in pattern:
+      return True
+    elif setting == EmptyMatchTreatment.DISALLOW:
+      return False
+    else:
+      raise ValueError(setting)
+
+
+class _MatchAllFn(beam.DoFn):
+
+  def __init__(self, empty_match_treatment):
+    self._empty_match_treatment = empty_match_treatment
+
+  def process(self, file_pattern):
+    # TODO: Should we batch the lookups?
+    match_results = filesystems.FileSystems.match([file_pattern])
+    match_result = match_results[0]
+
+    if (not match_result.metadata_list
+        and not EmptyMatchTreatment.allow_empty_match(
+            file_pattern, self._empty_match_treatment)):
+      raise BeamIOError(
+          'Empty match for pattern %s. Disallowed.' % file_pattern)
+
+    return match_result.metadata_list
+
+
+@experimental()
+class MatchFiles(beam.PTransform):
+  """Matches a file pattern using ``FileSystems.match``.
+
+  This ``PTransform`` returns a ``PCollection`` of matching files in the form
+  of ``FileMetadata`` objects."""
+
+  def __init__(self,
+               file_pattern,
+               empty_match_treatment=EmptyMatchTreatment.ALLOW_IF_WILDCARD):
+    self._file_pattern = file_pattern
+    self._empty_match_treatment = empty_match_treatment
+
+  def expand(self, pcoll):
+    return (pcoll.pipeline
+            | beam.Create([self._file_pattern])
+            | MatchAll())
+
+
+@experimental()
+class MatchAll(beam.PTransform):
+  """Matches file patterns from the input PCollection via ``FileSystems.match``.
+
+  This ``PTransform`` returns a ``PCollection`` of matching files in the form
+  of ``FileMetadata`` objects."""
+
+  def __init__(self, empty_match_treatment=EmptyMatchTreatment.ALLOW):
+    self._empty_match_treatment = empty_match_treatment
+
+  def expand(self, pcoll):
+    return (pcoll
+            | beam.ParDo(_MatchAllFn(self._empty_match_treatment)))
+
+
+class _ReadMatchesFn(beam.DoFn):
+
+  def __init__(self, compression, skip_directories):
+    self._compression = compression
+    self._skip_directories = skip_directories
+
+  def process(self, file_metadata):
+    metadata = (filesystem.FileMetadata(file_metadata, 0)
+                if isinstance(file_metadata, (str, unicode))
+                else file_metadata)
+
+    if metadata.path.endswith('/') and self._skip_directories:
+      return
+    elif metadata.path.endswith('/'):
+      raise BeamIOError(
+          'Directories are not allowed in ReadMatches transform.'
+          'Found %s.' % metadata.path)
+
+    # TODO: Mime type? Other arguments? Maybe arguments passed in to transform?
+    yield ReadableFile(metadata)
+
+
+class ReadableFile(object):
+  """A utility class for accessing files."""
+
+  def __init__(self, metadata):
+    self.metadata = metadata
+
+  def open(self, mime_type='text/plain'):
+    return filesystems.FileSystems.open(self.metadata.path)
+
+  def read(self):
+    return self.open().read()
+
+  def read_utf8(self):
+    return self.open().read().decode('utf-8')
+
+
+@experimental()
+class ReadMatches(beam.PTransform):
+  """Converts each result of MatchFiles() or MatchAll() to a ReadableFile.
+
+   This helps read in a file's contents or obtain a file descriptor."""
+
+  def __init__(self, compression=None, skip_directories=True):
+    self._compression = compression
+    self._skip_directories = skip_directories
+
+  def expand(self, pcoll):
+    return pcoll | beam.ParDo(_ReadMatchesFn(self._compression,
+                                             self._skip_directories))
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
new file mode 100644
index 0000000..8963da1
--- /dev/null
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -0,0 +1,235 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""Tests for transforms defined in apache_beam.io.fileio."""
+
+from __future__ import absolute_import
+
+import csv
+import io
+import logging
+import sys
+import unittest
+
+from nose.plugins.attrib import attr
+
+import apache_beam as beam
+from apache_beam.io import fileio
+from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_utils import compute_hash
+from apache_beam.testing.util import assert_that
+from apache_beam.testing.util import equal_to
+
+
+class MatchTest(_TestCaseWithTempDirCleanUp):
+
+  def test_basic_two_files(self):
+    files = []
+    tempdir = '%s/' % self._new_tempdir()
+
+    # Create a couple files to be matched
+    files.append(self._create_temp_file(dir=tempdir))
+    files.append(self._create_temp_file(dir=tempdir))
+
+    with TestPipeline() as p:
+      files_pc = p | fileio.MatchFiles(tempdir) | beam.Map(lambda x: x.path)
+
+      assert_that(files_pc, equal_to(files))
+
+  def test_match_all_two_directories(self):
+    files = []
+    directories = []
+
+    for _ in range(2):
+      # TODO: What about this having to append the ending slash?
+      d = '%s/' % self._new_tempdir()
+      directories.append(d)
+
+      files.append(self._create_temp_file(dir=d))
+      files.append(self._create_temp_file(dir=d))
+
+    with TestPipeline() as p:
+      files_pc = (p
+                  | beam.Create(directories)
+                  | fileio.MatchAll()
+                  | beam.Map(lambda x: x.path))
+
+      assert_that(files_pc, equal_to(files))
+
+  def test_match_files_one_directory_failure(self):
+    directories = [
+        '%s/' % self._new_tempdir(),
+        '%s/' % self._new_tempdir()]
+
+    files = list()
+    files.append(self._create_temp_file(dir=directories[0]))
+    files.append(self._create_temp_file(dir=directories[0]))
+
+    with self.assertRaises(beam.io.filesystem.BeamIOError):
+      with TestPipeline() as p:
+        files_pc = (
+            p
+            | beam.Create(directories)
+            | fileio.MatchAll(fileio.EmptyMatchTreatment.DISALLOW)
+            | beam.Map(lambda x: x.path))
+
+        assert_that(files_pc, equal_to(files))
+
+  def test_match_files_one_directory_failure(self):
+    directories = [
+        '%s/' % self._new_tempdir(),
+        '%s/' % self._new_tempdir()]
+
+    files = list()
+    files.append(self._create_temp_file(dir=directories[0]))
+    files.append(self._create_temp_file(dir=directories[0]))
+
+    with TestPipeline() as p:
+      files_pc = (
+          p
+          | beam.Create(['%s*' % d for d in directories])
+          | fileio.MatchAll(fileio.EmptyMatchTreatment.ALLOW_IF_WILDCARD)
+          | beam.Map(lambda x: x.path))
+
+      assert_that(files_pc, equal_to(files))
+
+
+class ReadTest(_TestCaseWithTempDirCleanUp):
+
+  def test_basic_file_name_provided(self):
+    content = 'TestingMyContent\nIn multiple lines\nhaha!'
+    dir = '%s/' % self._new_tempdir()
+    self._create_temp_file(dir=dir, content=content)
+
+    with TestPipeline() as p:
+      content_pc = (p
+                    | beam.Create([dir])
+                    | fileio.MatchAll()
+                    | fileio.ReadMatches()
+                    | beam.Map(lambda f: f.read().decode('utf-8')))
+
+      assert_that(content_pc, equal_to([content]))
+
+  def test_csv_file_source(self):
+    content = 'name,year,place\ngoogle,1999,CA\nspotify,2006,sweden'
+    rows = [r.split(',') for r in content.split('\n')]
+
+    dir = '%s/' % self._new_tempdir()
+    self._create_temp_file(dir=dir, content=content)
+
+    def get_csv_reader(readable_file):
+      if sys.version_info >= (3, 0):
+        return csv.reader(io.TextIOWrapper(readable_file.open()))
+      else:
+        return csv.reader(readable_file.open())
+
+    with TestPipeline() as p:
+      content_pc = (p
+                    | beam.Create([dir])
+                    | fileio.MatchAll()
+                    | fileio.ReadMatches()
+                    | beam.FlatMap(get_csv_reader))
+
+      assert_that(content_pc, equal_to(rows))
+
+  def test_string_filenames_and_skip_directory(self):
+    content = 'thecontent\n'
+    files = []
+    tempdir = '%s/' % self._new_tempdir()
+
+    # Create a couple files to be matched
+    files.append(self._create_temp_file(dir=tempdir, content=content))
+    files.append(self._create_temp_file(dir=tempdir, content=content))
+
+    with TestPipeline() as p:
+      contents_pc = (p
+                     | beam.Create(files + [tempdir])
+                     | fileio.ReadMatches()
+                     | beam.Map(lambda x: x.read().decode('utf-8')))
+
+      assert_that(contents_pc, equal_to([content]*2))
+
+  def test_fail_on_directories(self):
+    content = 'thecontent\n'
+    files = []
+    tempdir = '%s/' % self._new_tempdir()
+
+    # Create a couple files to be matched
+    files.append(self._create_temp_file(dir=tempdir, content=content))
+    files.append(self._create_temp_file(dir=tempdir, content=content))
+
+    with self.assertRaises(beam.io.filesystem.BeamIOError):
+      with TestPipeline() as p:
+        _ = (p
+             | beam.Create(files + [tempdir])
+             | fileio.ReadMatches(skip_directories=False)
+             | beam.Map(lambda x: x.read_utf8()))
+
+
+class MatchIntegrationTest(unittest.TestCase):
+
+  INPUT_FILE = 'gs://dataflow-samples/shakespeare/kinglear.txt'
+  KINGLEAR_CHECKSUM = 'f418b25f1507f5a901257026b035ac2857a7ab87'
+  INPUT_FILE_LARGE = (
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-00000000000*.json')
+
+  WIKI_FILES = [
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000000.json',
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000001.json',
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000002.json',
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000003.json',
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000004.json',
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000005.json',
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000006.json',
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000007.json',
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000008.json',
+      'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000009.json',
+  ]
+
+  def setUp(self):
+    self.test_pipeline = TestPipeline(is_integration_test=True)
+
+  @attr('IT')
+  def test_transform_on_gcs(self):
+    args = self.test_pipeline.get_full_options_as_args()
+
+    with beam.Pipeline(argv=args) as p:
+      matches_pc = (p
+                    | beam.Create([self.INPUT_FILE, self.INPUT_FILE_LARGE])
+                    | fileio.MatchAll()
+                    | 'GetPath' >> beam.Map(lambda metadata: metadata.path))
+
+      assert_that(matches_pc,
+                  equal_to([self.INPUT_FILE] + self.WIKI_FILES),
+                  label='Matched Files')
+
+      checksum_pc = (p
+                     | 'SingleFile' >> beam.Create([self.INPUT_FILE])
+                     | 'MatchOneAll' >> fileio.MatchAll()
+                     | fileio.ReadMatches()
+                     | 'ReadIn' >> beam.Map(lambda x: x.read_utf8().split('\n'))
+                     | 'Checksums' >> beam.Map(compute_hash))
+
+      assert_that(checksum_pc,
+                  equal_to([self.KINGLEAR_CHECKSUM]),
+                  label='Assert Checksums')
+
+
+if __name__ == '__main__':
+  logging.getLogger().setLevel(logging.INFO)
+  unittest.main()


Mime
View raw message