Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 909FF200BDA for ; Tue, 29 Nov 2016 02:41:38 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 8F22F160B22; Tue, 29 Nov 2016 01:41:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 8CA01160B0D for ; Tue, 29 Nov 2016 02:41:37 +0100 (CET) Received: (qmail 74488 invoked by uid 500); 29 Nov 2016 01:41:36 -0000 Mailing-List: contact commits-help@beam.incubator.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.incubator.apache.org Delivered-To: mailing list commits@beam.incubator.apache.org Received: (qmail 74466 invoked by uid 99); 29 Nov 2016 01:41:36 -0000 Received: from pnap-us-west-generic-nat.apache.org (HELO spamd4-us-west.apache.org) (209.188.14.142) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2016 01:41:36 +0000 Received: from localhost (localhost [127.0.0.1]) by spamd4-us-west.apache.org (ASF Mail Server at spamd4-us-west.apache.org) with ESMTP id 1E9CBC0FD4 for ; Tue, 29 Nov 2016 01:41:36 +0000 (UTC) X-Virus-Scanned: Debian amavisd-new at spamd4-us-west.apache.org X-Spam-Flag: NO X-Spam-Score: -6.219 X-Spam-Level: X-Spam-Status: No, score=-6.219 tagged_above=-999 required=6.31 tests=[KAM_ASCII_DIVIDERS=0.8, KAM_LAZY_DOMAIN_SECURITY=1, RCVD_IN_DNSWL_HI=-5, RCVD_IN_MSPIKE_H3=-0.01, RCVD_IN_MSPIKE_WL=-0.01, RP_MATCHES_RCVD=-2.999] autolearn=disabled Received: from mx1-lw-eu.apache.org ([10.40.0.8]) by localhost (spamd4-us-west.apache.org [10.40.0.11]) (amavisd-new, port 10024) with ESMTP id rxywSdXLAiaa for ; Tue, 29 Nov 2016 01:41:34 +0000 (UTC) Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx1-lw-eu.apache.org (ASF Mail Server at mx1-lw-eu.apache.org) with SMTP id D60B15FAD8 for ; Tue, 29 Nov 2016 01:41:32 +0000 (UTC) Received: (qmail 74402 invoked by uid 99); 29 Nov 2016 01:41:31 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 29 Nov 2016 01:41:31 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id E0E8DE2F35; Tue, 29 Nov 2016 01:41:30 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: davor@apache.org To: commits@beam.incubator.apache.org Date: Tue, 29 Nov 2016 01:41:30 -0000 Message-Id: <3c4a18cf472d41c8abfbc938b1f1ea1b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] incubator-beam git commit: Improve the speed of getting file sizes archived-at: Tue, 29 Nov 2016 01:41:38 -0000 Repository: incubator-beam Updated Branches: refs/heads/python-sdk 1530a1727 -> ad4dc87a4 Improve the speed of getting file sizes Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7a059d37 Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7a059d37 Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7a059d37 Branch: refs/heads/python-sdk Commit: 7a059d37e71b62702e8cdeafec6956fc7e1e38c4 Parents: 1530a17 Author: Sourabh Bajaj Authored: Mon Nov 21 15:50:21 2016 -0800 Committer: Davor Bonaci Committed: Mon Nov 28 17:40:37 2016 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/filebasedsource.py | 42 ++++++++++++++-------- sdks/python/apache_beam/io/fileio.py | 31 ++++++++++++++++ sdks/python/apache_beam/io/fileio_test.py | 41 +++++++++++++++++++++ sdks/python/apache_beam/io/gcsio.py | 25 +++++++++++++ sdks/python/apache_beam/io/gcsio_test.py | 38 ++++++++++++++++++++ 5 files changed, 163 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/filebasedsource.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py index 7d8f686..14eaf27 100644 --- a/sdks/python/apache_beam/io/filebasedsource.py +++ b/sdks/python/apache_beam/io/filebasedsource.py @@ -107,7 +107,8 @@ class FileBasedSource(iobase.BoundedSource): if self._concat_source is None: single_file_sources = [] file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] - sizes = FileBasedSource._estimate_sizes_in_parallel(file_names) + sizes = FileBasedSource._estimate_sizes_of_files(file_names, + self._pattern) # We create a reference for FileBasedSource that will be serialized along # with each _SingleFileSource. To prevent this FileBasedSource from having @@ -144,22 +145,32 @@ class FileBasedSource(iobase.BoundedSource): compression_type=self._compression_type) @staticmethod - def _estimate_sizes_in_parallel(file_names): + def _estimate_sizes_of_files(file_names, pattern=None): + """Returns the size of all the files as an ordered list based on the file + names that are provided here. If the pattern is specified here then we use + the size_of_files_in_glob method to get the size of files matching the glob + for performance improvements instead of getting the size one by one. + """ if not file_names: return [] elif len(file_names) == 1: return [fileio.ChannelFactory.size_in_bytes(file_names[0])] else: - # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a - # child thread. (http://bugs.python.org/issue10015) - if not hasattr(threading.current_thread(), '_children'): - threading.current_thread()._children = weakref.WeakKeyDictionary() - pool = ThreadPool( - min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names))) - try: - return pool.map(fileio.ChannelFactory.size_in_bytes, file_names) - finally: - pool.terminate() + if pattern is None: + # ThreadPool crashes in old versions of Python (< 2.7.5) if created + # from a child thread. (http://bugs.python.org/issue10015) + if not hasattr(threading.current_thread(), '_children'): + threading.current_thread()._children = weakref.WeakKeyDictionary() + pool = ThreadPool( + min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names))) + try: + return pool.map(fileio.ChannelFactory.size_in_bytes, file_names) + finally: + pool.terminate() + else: + file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern, + file_names) + return [file_sizes[f] for f in file_names] def _validate(self): """Validate if there are actual files in the specified glob pattern @@ -179,7 +190,10 @@ class FileBasedSource(iobase.BoundedSource): file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)] if (len(file_names) <= FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT): - return sum(self._estimate_sizes_in_parallel(file_names)) + # We're reading very few files so we can pass names without pattern + # as otherwise we'll try to do optimization based on the pattern and + # might end up reading much more data than needed for a few files. + return sum(self._estimate_sizes_of_files(file_names)) else: # Estimating size of a random sample. # TODO: better support distributions where file sizes are not @@ -188,7 +202,7 @@ class FileBasedSource(iobase.BoundedSource): int(len(file_names) * FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT)) sample = random.sample(file_names, sample_size) - estimate = self._estimate_sizes_in_parallel(sample) + estimate = self._estimate_sizes_of_files(sample, self._pattern) return int( sum(estimate) * (float(len(file_names)) / len(sample))) http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/fileio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py index 30044c3..c71a730 100644 --- a/sdks/python/apache_beam/io/fileio.py +++ b/sdks/python/apache_beam/io/fileio.py @@ -606,6 +606,37 @@ class ChannelFactory(object): else: return os.path.getsize(path) + @staticmethod + def size_of_files_in_glob(path, file_names=None): + """Returns a map of file names to sizes. + + Args: + path: a file path pattern that reads the size of all the files + file_names: List of file names that we need size for, this is added to + support eventually consistent sources where two expantions of glob + might yield to different files. + """ + if path.startswith('gs://'): + file_sizes = gcsio.GcsIO().size_of_files_in_glob(path) + if file_names is None: + return file_sizes + else: + result = {} + # We need to make sure we fetched the size for all the files as the + # list API in GCS is eventually consistent so directly call size for + # any files that may be missing. + for file_name in file_names: + if file_name in file_sizes: + result[file_name] = file_sizes[file_name] + else: + result[file_name] = ChannelFactory.size_in_bytes(file_name) + return result + else: + if file_names is None: + file_names = ChannelFactory.glob(path) + return {file_name: ChannelFactory.size_in_bytes(file_name) + for file_name in file_names} + class _CompressedFile(object): """Somewhat limited file wrapper for easier handling of compressed files.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/fileio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py index 098ace1..a68d484 100644 --- a/sdks/python/apache_beam/io/fileio_test.py +++ b/sdks/python/apache_beam/io/fileio_test.py @@ -582,6 +582,47 @@ class TestTextFileSource(unittest.TestCase): self.progress_with_offsets(lines, start_offset=14) self.progress_with_offsets(lines, start_offset=20, end_offset=20) + @mock.patch('apache_beam.io.fileio.gcsio') + def test_size_of_files_in_glob_complete(self, *unused_args): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + fileio.gcsio.GcsIO = lambda: gcsio_mock + file_names = ['gs://bucket/file1', 'gs://bucket/file2'] + gcsio_mock.size_of_files_in_glob.return_value = { + 'gs://bucket/file1': 1, + 'gs://bucket/file2': 2 + } + expected_results = { + 'gs://bucket/file1': 1, + 'gs://bucket/file2': 2 + } + self.assertEqual( + fileio.ChannelFactory.size_of_files_in_glob( + 'gs://bucket/*', file_names), + expected_results) + gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') + + @mock.patch('apache_beam.io.fileio.gcsio') + def test_size_of_files_in_glob_incomplete(self, *unused_args): + # Prepare mocks. + gcsio_mock = mock.MagicMock() + fileio.gcsio.GcsIO = lambda: gcsio_mock + file_names = ['gs://bucket/file1', 'gs://bucket/file2'] + gcsio_mock.size_of_files_in_glob.return_value = { + 'gs://bucket/file1': 1 + } + gcsio_mock.size.return_value = 2 + expected_results = { + 'gs://bucket/file1': 1, + 'gs://bucket/file2': 2 + } + self.assertEqual( + fileio.ChannelFactory.size_of_files_in_glob( + 'gs://bucket/*', file_names), + expected_results) + gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*') + gcsio_mock.size.assert_called_once_with('gs://bucket/file2') + class TestNativeTextFileSink(unittest.TestCase): http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 4f310be..9adb946 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -353,6 +353,31 @@ class GcsIO(object): bucket=bucket, object=object_path) return self.client.objects.Get(request).size + @retry.with_exponential_backoff( + retry_filter=retry.retry_on_server_errors_and_timeout_filter) + def size_of_files_in_glob(self, pattern): + """Returns the size of all the files in the glob as a dictionary + + Args: + path: a file path pattern that reads the size of all the files + """ + bucket, name_pattern = parse_gcs_path(pattern) + # Get the prefix with which we can list objects in the given bucket. + prefix = re.match('^[^[*?]*', name_pattern).group(0) + request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix) + file_sizes = {} + while True: + response = self.client.objects.List(request) + for item in response.items: + if fnmatch.fnmatch(item.name, name_pattern): + file_name = 'gs://%s/%s' % (item.bucket, item.name) + file_sizes[file_name] = item.size + if response.nextPageToken: + request.pageToken = response.nextPageToken + else: + break + return file_sizes + class GcsBufferedReader(object): """A class for reading Google Cloud Storage files.""" http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index 95c8e58..9d44e17 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -652,6 +652,44 @@ class TestGCSIO(unittest.TestCase): self.assertEqual( set(self.gcs.glob(file_pattern)), set(expected_file_names)) + def test_size_of_files_in_glob(self): + bucket_name = 'gcsio-test' + object_names = [ + ('cow/cat/fish', 2), + ('cow/cat/blubber', 3), + ('cow/dog/blubber', 4), + ('apple/dog/blubber', 5), + ('apple/fish/blubber', 6), + ('apple/fish/blowfish', 7), + ('apple/fish/bambi', 8), + ('apple/fish/balloon', 9), + ('apple/fish/cat', 10), + ('apple/fish/cart', 11), + ('apple/fish/carl', 12), + ('apple/dish/bat', 13), + ('apple/dish/cat', 14), + ('apple/dish/carl', 15), + ] + for (object_name, size) in object_names: + file_name = 'gs://%s/%s' % (bucket_name, object_name) + self._insert_random_file(self.client, file_name, size) + test_cases = [ + ('gs://gcsio-test/cow/*', [ + ('cow/cat/fish', 2), + ('cow/cat/blubber', 3), + ('cow/dog/blubber', 4), + ]), + ('gs://gcsio-test/apple/fish/car?', [ + ('apple/fish/cart', 11), + ('apple/fish/carl', 12), + ]) + ] + for file_pattern, expected_object_names in test_cases: + expected_file_sizes = {'gs://%s/%s' % (bucket_name, o): s + for (o, s) in expected_object_names} + self.assertEqual( + self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes) + class TestPipeStream(unittest.TestCase):