From commits-return-101150-archive-asf-public=cust-asf.ponee.io@beam.apache.org Sat Mar 9 00:25:50 2019 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id CC946180626 for ; Sat, 9 Mar 2019 01:25:49 +0100 (CET) Received: (qmail 5947 invoked by uid 500); 9 Mar 2019 00:25:49 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 5936 invoked by uid 99); 9 Mar 2019 00:25:48 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 09 Mar 2019 00:25:48 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 5981687990; Sat, 9 Mar 2019 00:25:48 +0000 (UTC) Date: Sat, 09 Mar 2019 00:25:47 +0000 To: "commits@beam.apache.org" Subject: [beam] branch master updated: FileIO reading transforms. First implementation. MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <155209114613.24088.6982151141562551946@gitbox.apache.org> From: pabloem@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: beam X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 6c4457461ef70c4fd0c8c31b8c06ea7f66d71657 X-Git-Newrev: 4b1aa20f00b0fef0e1f684e4d58705460d71c36b X-Git-Rev: 4b1aa20f00b0fef0e1f684e4d58705460d71c36b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. pabloem pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git The following commit(s) were added to refs/heads/master by this push: new 4b1aa20 FileIO reading transforms. First implementation. new dd83432 Merge pull request #7791 from pabloem/fileio-read 4b1aa20 is described below commit 4b1aa20f00b0fef0e1f684e4d58705460d71c36b Author: Pablo AuthorDate: Fri Feb 8 11:48:14 2019 -0800 FileIO reading transforms. First implementation. --- sdks/python/apache_beam/io/filebasedsink_test.py | 10 +- sdks/python/apache_beam/io/fileio.py | 171 +++++++++++++++++ sdks/python/apache_beam/io/fileio_test.py | 235 +++++++++++++++++++++++ 3 files changed, 414 insertions(+), 2 deletions(-) diff --git a/sdks/python/apache_beam/io/filebasedsink_test.py b/sdks/python/apache_beam/io/filebasedsink_test.py index 5069da5..a934381 100644 --- a/sdks/python/apache_beam/io/filebasedsink_test.py +++ b/sdks/python/apache_beam/io/filebasedsink_test.py @@ -65,12 +65,18 @@ class _TestCaseWithTempDirCleanUp(unittest.TestCase): self._tempdirs.append(result) return result - def _create_temp_file(self, name='', suffix=''): + def _create_temp_file(self, name='', suffix='', dir=None, content=None): if not name: name = tempfile.template + if not dir: + dir = self._new_tempdir() file_name = tempfile.NamedTemporaryFile( delete=False, prefix=name, - dir=self._new_tempdir(), suffix=suffix).name + dir=dir, suffix=suffix).name + + if content: + with open(file_name, 'w') as f: + f.write(content) return file_name diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py new file mode 100644 index 0000000..65ce76b --- /dev/null +++ b/sdks/python/apache_beam/io/fileio.py @@ -0,0 +1,171 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""``PTransforms`` for manipulating files in Apache Beam. + +Provides reading ``PTransform``\\s, ``MatchFiles``, +``MatchAll``, that produces a ``PCollection`` of records representing a file +and its metadata; and ``ReadMatches``, which takes in a ``PCollection`` of file +metadata records, and produces a ``PCollection`` of ``ReadableFile`` objects. +These transforms currently do not support splitting by themselves. + +No backward compatibility guarantees. Everything in this module is experimental. +""" + +from __future__ import absolute_import + +from past.builtins import unicode + +import apache_beam as beam +from apache_beam.io import filesystem +from apache_beam.io import filesystems +from apache_beam.io.filesystem import BeamIOError +from apache_beam.utils.annotations import experimental + +__all__ = ['EmptyMatchTreatment', + 'MatchFiles', + 'MatchAll', + 'ReadableFile', + 'ReadMatches'] + + +class EmptyMatchTreatment(object): + """How to treat empty matches in ``MatchAll`` and ``MatchFiles`` transforms. + + If empty matches are disallowed, an error will be thrown if a pattern does not + match any files.""" + + ALLOW = 'ALLOW' + DISALLOW = 'DISALLOW' + ALLOW_IF_WILDCARD = 'ALLOW_IF_WILDCARD' + + @staticmethod + def allow_empty_match(pattern, setting): + if setting == EmptyMatchTreatment.ALLOW: + return True + elif setting == EmptyMatchTreatment.ALLOW_IF_WILDCARD and '*' in pattern: + return True + elif setting == EmptyMatchTreatment.DISALLOW: + return False + else: + raise ValueError(setting) + + +class _MatchAllFn(beam.DoFn): + + def __init__(self, empty_match_treatment): + self._empty_match_treatment = empty_match_treatment + + def process(self, file_pattern): + # TODO: Should we batch the lookups? + match_results = filesystems.FileSystems.match([file_pattern]) + match_result = match_results[0] + + if (not match_result.metadata_list + and not EmptyMatchTreatment.allow_empty_match( + file_pattern, self._empty_match_treatment)): + raise BeamIOError( + 'Empty match for pattern %s. Disallowed.' % file_pattern) + + return match_result.metadata_list + + +@experimental() +class MatchFiles(beam.PTransform): + """Matches a file pattern using ``FileSystems.match``. + + This ``PTransform`` returns a ``PCollection`` of matching files in the form + of ``FileMetadata`` objects.""" + + def __init__(self, + file_pattern, + empty_match_treatment=EmptyMatchTreatment.ALLOW_IF_WILDCARD): + self._file_pattern = file_pattern + self._empty_match_treatment = empty_match_treatment + + def expand(self, pcoll): + return (pcoll.pipeline + | beam.Create([self._file_pattern]) + | MatchAll()) + + +@experimental() +class MatchAll(beam.PTransform): + """Matches file patterns from the input PCollection via ``FileSystems.match``. + + This ``PTransform`` returns a ``PCollection`` of matching files in the form + of ``FileMetadata`` objects.""" + + def __init__(self, empty_match_treatment=EmptyMatchTreatment.ALLOW): + self._empty_match_treatment = empty_match_treatment + + def expand(self, pcoll): + return (pcoll + | beam.ParDo(_MatchAllFn(self._empty_match_treatment))) + + +class _ReadMatchesFn(beam.DoFn): + + def __init__(self, compression, skip_directories): + self._compression = compression + self._skip_directories = skip_directories + + def process(self, file_metadata): + metadata = (filesystem.FileMetadata(file_metadata, 0) + if isinstance(file_metadata, (str, unicode)) + else file_metadata) + + if metadata.path.endswith('/') and self._skip_directories: + return + elif metadata.path.endswith('/'): + raise BeamIOError( + 'Directories are not allowed in ReadMatches transform.' + 'Found %s.' % metadata.path) + + # TODO: Mime type? Other arguments? Maybe arguments passed in to transform? + yield ReadableFile(metadata) + + +class ReadableFile(object): + """A utility class for accessing files.""" + + def __init__(self, metadata): + self.metadata = metadata + + def open(self, mime_type='text/plain'): + return filesystems.FileSystems.open(self.metadata.path) + + def read(self): + return self.open().read() + + def read_utf8(self): + return self.open().read().decode('utf-8') + + +@experimental() +class ReadMatches(beam.PTransform): + """Converts each result of MatchFiles() or MatchAll() to a ReadableFile. + + This helps read in a file's contents or obtain a file descriptor.""" + + def __init__(self, compression=None, skip_directories=True): + self._compression = compression + self._skip_directories = skip_directories + + def expand(self, pcoll): + return pcoll | beam.ParDo(_ReadMatchesFn(self._compression, + self._skip_directories)) diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py new file mode 100644 index 0000000..8963da1 --- /dev/null +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -0,0 +1,235 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +"""Tests for transforms defined in apache_beam.io.fileio.""" + +from __future__ import absolute_import + +import csv +import io +import logging +import sys +import unittest + +from nose.plugins.attrib import attr + +import apache_beam as beam +from apache_beam.io import fileio +from apache_beam.io.filebasedsink_test import _TestCaseWithTempDirCleanUp +from apache_beam.testing.test_pipeline import TestPipeline +from apache_beam.testing.test_utils import compute_hash +from apache_beam.testing.util import assert_that +from apache_beam.testing.util import equal_to + + +class MatchTest(_TestCaseWithTempDirCleanUp): + + def test_basic_two_files(self): + files = [] + tempdir = '%s/' % self._new_tempdir() + + # Create a couple files to be matched + files.append(self._create_temp_file(dir=tempdir)) + files.append(self._create_temp_file(dir=tempdir)) + + with TestPipeline() as p: + files_pc = p | fileio.MatchFiles(tempdir) | beam.Map(lambda x: x.path) + + assert_that(files_pc, equal_to(files)) + + def test_match_all_two_directories(self): + files = [] + directories = [] + + for _ in range(2): + # TODO: What about this having to append the ending slash? + d = '%s/' % self._new_tempdir() + directories.append(d) + + files.append(self._create_temp_file(dir=d)) + files.append(self._create_temp_file(dir=d)) + + with TestPipeline() as p: + files_pc = (p + | beam.Create(directories) + | fileio.MatchAll() + | beam.Map(lambda x: x.path)) + + assert_that(files_pc, equal_to(files)) + + def test_match_files_one_directory_failure(self): + directories = [ + '%s/' % self._new_tempdir(), + '%s/' % self._new_tempdir()] + + files = list() + files.append(self._create_temp_file(dir=directories[0])) + files.append(self._create_temp_file(dir=directories[0])) + + with self.assertRaises(beam.io.filesystem.BeamIOError): + with TestPipeline() as p: + files_pc = ( + p + | beam.Create(directories) + | fileio.MatchAll(fileio.EmptyMatchTreatment.DISALLOW) + | beam.Map(lambda x: x.path)) + + assert_that(files_pc, equal_to(files)) + + def test_match_files_one_directory_failure(self): + directories = [ + '%s/' % self._new_tempdir(), + '%s/' % self._new_tempdir()] + + files = list() + files.append(self._create_temp_file(dir=directories[0])) + files.append(self._create_temp_file(dir=directories[0])) + + with TestPipeline() as p: + files_pc = ( + p + | beam.Create(['%s*' % d for d in directories]) + | fileio.MatchAll(fileio.EmptyMatchTreatment.ALLOW_IF_WILDCARD) + | beam.Map(lambda x: x.path)) + + assert_that(files_pc, equal_to(files)) + + +class ReadTest(_TestCaseWithTempDirCleanUp): + + def test_basic_file_name_provided(self): + content = 'TestingMyContent\nIn multiple lines\nhaha!' + dir = '%s/' % self._new_tempdir() + self._create_temp_file(dir=dir, content=content) + + with TestPipeline() as p: + content_pc = (p + | beam.Create([dir]) + | fileio.MatchAll() + | fileio.ReadMatches() + | beam.Map(lambda f: f.read().decode('utf-8'))) + + assert_that(content_pc, equal_to([content])) + + def test_csv_file_source(self): + content = 'name,year,place\ngoogle,1999,CA\nspotify,2006,sweden' + rows = [r.split(',') for r in content.split('\n')] + + dir = '%s/' % self._new_tempdir() + self._create_temp_file(dir=dir, content=content) + + def get_csv_reader(readable_file): + if sys.version_info >= (3, 0): + return csv.reader(io.TextIOWrapper(readable_file.open())) + else: + return csv.reader(readable_file.open()) + + with TestPipeline() as p: + content_pc = (p + | beam.Create([dir]) + | fileio.MatchAll() + | fileio.ReadMatches() + | beam.FlatMap(get_csv_reader)) + + assert_that(content_pc, equal_to(rows)) + + def test_string_filenames_and_skip_directory(self): + content = 'thecontent\n' + files = [] + tempdir = '%s/' % self._new_tempdir() + + # Create a couple files to be matched + files.append(self._create_temp_file(dir=tempdir, content=content)) + files.append(self._create_temp_file(dir=tempdir, content=content)) + + with TestPipeline() as p: + contents_pc = (p + | beam.Create(files + [tempdir]) + | fileio.ReadMatches() + | beam.Map(lambda x: x.read().decode('utf-8'))) + + assert_that(contents_pc, equal_to([content]*2)) + + def test_fail_on_directories(self): + content = 'thecontent\n' + files = [] + tempdir = '%s/' % self._new_tempdir() + + # Create a couple files to be matched + files.append(self._create_temp_file(dir=tempdir, content=content)) + files.append(self._create_temp_file(dir=tempdir, content=content)) + + with self.assertRaises(beam.io.filesystem.BeamIOError): + with TestPipeline() as p: + _ = (p + | beam.Create(files + [tempdir]) + | fileio.ReadMatches(skip_directories=False) + | beam.Map(lambda x: x.read_utf8())) + + +class MatchIntegrationTest(unittest.TestCase): + + INPUT_FILE = 'gs://dataflow-samples/shakespeare/kinglear.txt' + KINGLEAR_CHECKSUM = 'f418b25f1507f5a901257026b035ac2857a7ab87' + INPUT_FILE_LARGE = ( + 'gs://dataflow-samples/wikipedia_edits/wiki_data-00000000000*.json') + + WIKI_FILES = [ + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000000.json', + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000001.json', + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000002.json', + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000003.json', + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000004.json', + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000005.json', + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000006.json', + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000007.json', + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000008.json', + 'gs://dataflow-samples/wikipedia_edits/wiki_data-000000000009.json', + ] + + def setUp(self): + self.test_pipeline = TestPipeline(is_integration_test=True) + + @attr('IT') + def test_transform_on_gcs(self): + args = self.test_pipeline.get_full_options_as_args() + + with beam.Pipeline(argv=args) as p: + matches_pc = (p + | beam.Create([self.INPUT_FILE, self.INPUT_FILE_LARGE]) + | fileio.MatchAll() + | 'GetPath' >> beam.Map(lambda metadata: metadata.path)) + + assert_that(matches_pc, + equal_to([self.INPUT_FILE] + self.WIKI_FILES), + label='Matched Files') + + checksum_pc = (p + | 'SingleFile' >> beam.Create([self.INPUT_FILE]) + | 'MatchOneAll' >> fileio.MatchAll() + | fileio.ReadMatches() + | 'ReadIn' >> beam.Map(lambda x: x.read_utf8().split('\n')) + | 'Checksums' >> beam.Map(compute_hash)) + + assert_that(checksum_pc, + equal_to([self.KINGLEAR_CHECKSUM]), + label='Assert Checksums') + + +if __name__ == '__main__': + logging.getLogger().setLevel(logging.INFO) + unittest.main()