beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From da...@apache.org
Subject [1/2] incubator-beam git commit: Improve the speed of getting file sizes
Date Tue, 29 Nov 2016 01:41:30 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 1530a1727 -> ad4dc87a4


Improve the speed of getting file sizes


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/7a059d37
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/7a059d37
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/7a059d37

Branch: refs/heads/python-sdk
Commit: 7a059d37e71b62702e8cdeafec6956fc7e1e38c4
Parents: 1530a17
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Mon Nov 21 15:50:21 2016 -0800
Committer: Davor Bonaci <davor@google.com>
Committed: Mon Nov 28 17:40:37 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py | 42 ++++++++++++++--------
 sdks/python/apache_beam/io/fileio.py          | 31 ++++++++++++++++
 sdks/python/apache_beam/io/fileio_test.py     | 41 +++++++++++++++++++++
 sdks/python/apache_beam/io/gcsio.py           | 25 +++++++++++++
 sdks/python/apache_beam/io/gcsio_test.py      | 38 ++++++++++++++++++++
 5 files changed, 163 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 7d8f686..14eaf27 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -107,7 +107,8 @@ class FileBasedSource(iobase.BoundedSource):
     if self._concat_source is None:
       single_file_sources = []
       file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
-      sizes = FileBasedSource._estimate_sizes_in_parallel(file_names)
+      sizes = FileBasedSource._estimate_sizes_of_files(file_names,
+                                                       self._pattern)
 
       # We create a reference for FileBasedSource that will be serialized along
       # with each _SingleFileSource. To prevent this FileBasedSource from having
@@ -144,22 +145,32 @@ class FileBasedSource(iobase.BoundedSource):
         compression_type=self._compression_type)
 
   @staticmethod
-  def _estimate_sizes_in_parallel(file_names):
+  def _estimate_sizes_of_files(file_names, pattern=None):
+    """Returns the size of all the files as an ordered list based on the file
+    names that are provided here. If the pattern is specified here then we use
+    the size_of_files_in_glob method to get the size of files matching the glob
+    for performance improvements instead of getting the size one by one.
+    """
     if not file_names:
       return []
     elif len(file_names) == 1:
       return [fileio.ChannelFactory.size_in_bytes(file_names[0])]
     else:
-      # ThreadPool crashes in old versions of Python (< 2.7.5) if created from a
-      # child thread. (http://bugs.python.org/issue10015)
-      if not hasattr(threading.current_thread(), '_children'):
-        threading.current_thread()._children = weakref.WeakKeyDictionary()
-      pool = ThreadPool(
-          min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names)))
-      try:
-        return pool.map(fileio.ChannelFactory.size_in_bytes, file_names)
-      finally:
-        pool.terminate()
+      if pattern is None:
+        # ThreadPool crashes in old versions of Python (< 2.7.5) if created
+        # from a child thread. (http://bugs.python.org/issue10015)
+        if not hasattr(threading.current_thread(), '_children'):
+          threading.current_thread()._children = weakref.WeakKeyDictionary()
+        pool = ThreadPool(
+            min(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION, len(file_names)))
+        try:
+          return pool.map(fileio.ChannelFactory.size_in_bytes, file_names)
+        finally:
+          pool.terminate()
+      else:
+        file_sizes = fileio.ChannelFactory.size_of_files_in_glob(pattern,
+                                                                 file_names)
+        return [file_sizes[f] for f in file_names]
 
   def _validate(self):
     """Validate if there are actual files in the specified glob pattern
@@ -179,7 +190,10 @@ class FileBasedSource(iobase.BoundedSource):
     file_names = [f for f in fileio.ChannelFactory.glob(self._pattern)]
     if (len(file_names) <=
         FileBasedSource.MIN_NUMBER_OF_FILES_TO_STAT):
-      return sum(self._estimate_sizes_in_parallel(file_names))
+      # We're reading very few files so we can pass names without pattern
+      # as otherwise we'll try to do optimization based on the pattern and
+      # might end up reading much more data than needed for a few files.
+      return sum(self._estimate_sizes_of_files(file_names))
     else:
       # Estimating size of a random sample.
       # TODO: better support distributions where file sizes are not
@@ -188,7 +202,7 @@ class FileBasedSource(iobase.BoundedSource):
                         int(len(file_names) *
                             FileBasedSource.MIN_FRACTION_OF_FILES_TO_STAT))
       sample = random.sample(file_names, sample_size)
-      estimate = self._estimate_sizes_in_parallel(sample)
+      estimate = self._estimate_sizes_of_files(sample, self._pattern)
       return int(
           sum(estimate) *
           (float(len(file_names)) / len(sample)))

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 30044c3..c71a730 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -606,6 +606,37 @@ class ChannelFactory(object):
     else:
       return os.path.getsize(path)
 
+  @staticmethod
+  def size_of_files_in_glob(path, file_names=None):
+    """Returns a map of file names to sizes.
+
+    Args:
+      path: a file path pattern that reads the size of all the files
+      file_names: List of file names that we need size for, this is added to
+        support eventually consistent sources where two expantions of glob
+        might yield to different files.
+    """
+    if path.startswith('gs://'):
+      file_sizes = gcsio.GcsIO().size_of_files_in_glob(path)
+      if file_names is None:
+        return file_sizes
+      else:
+        result = {}
+        # We need to make sure we fetched the size for all the files as the
+        # list API in GCS is eventually consistent so directly call size for
+        # any files that may be missing.
+        for file_name in file_names:
+          if file_name in file_sizes:
+            result[file_name] = file_sizes[file_name]
+          else:
+            result[file_name] = ChannelFactory.size_in_bytes(file_name)
+        return result
+    else:
+      if file_names is None:
+        file_names = ChannelFactory.glob(path)
+      return {file_name: ChannelFactory.size_in_bytes(file_name)
+              for file_name in file_names}
+
 
 class _CompressedFile(object):
   """Somewhat limited file wrapper for easier handling of compressed files."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/fileio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio_test.py b/sdks/python/apache_beam/io/fileio_test.py
index 098ace1..a68d484 100644
--- a/sdks/python/apache_beam/io/fileio_test.py
+++ b/sdks/python/apache_beam/io/fileio_test.py
@@ -582,6 +582,47 @@ class TestTextFileSource(unittest.TestCase):
     self.progress_with_offsets(lines, start_offset=14)
     self.progress_with_offsets(lines, start_offset=20, end_offset=20)
 
+  @mock.patch('apache_beam.io.fileio.gcsio')
+  def test_size_of_files_in_glob_complete(self, *unused_args):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    fileio.gcsio.GcsIO = lambda: gcsio_mock
+    file_names = ['gs://bucket/file1', 'gs://bucket/file2']
+    gcsio_mock.size_of_files_in_glob.return_value = {
+        'gs://bucket/file1': 1,
+        'gs://bucket/file2': 2
+    }
+    expected_results = {
+        'gs://bucket/file1': 1,
+        'gs://bucket/file2': 2
+    }
+    self.assertEqual(
+        fileio.ChannelFactory.size_of_files_in_glob(
+            'gs://bucket/*', file_names),
+        expected_results)
+    gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+
+  @mock.patch('apache_beam.io.fileio.gcsio')
+  def test_size_of_files_in_glob_incomplete(self, *unused_args):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    fileio.gcsio.GcsIO = lambda: gcsio_mock
+    file_names = ['gs://bucket/file1', 'gs://bucket/file2']
+    gcsio_mock.size_of_files_in_glob.return_value = {
+        'gs://bucket/file1': 1
+    }
+    gcsio_mock.size.return_value = 2
+    expected_results = {
+        'gs://bucket/file1': 1,
+        'gs://bucket/file2': 2
+    }
+    self.assertEqual(
+        fileio.ChannelFactory.size_of_files_in_glob(
+            'gs://bucket/*', file_names),
+        expected_results)
+    gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+    gcsio_mock.size.assert_called_once_with('gs://bucket/file2')
+
 
 class TestNativeTextFileSink(unittest.TestCase):
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index 4f310be..9adb946 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -353,6 +353,31 @@ class GcsIO(object):
         bucket=bucket, object=object_path)
     return self.client.objects.Get(request).size
 
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def size_of_files_in_glob(self, pattern):
+    """Returns the size of all the files in the glob as a dictionary
+
+    Args:
+      path: a file path pattern that reads the size of all the files
+    """
+    bucket, name_pattern = parse_gcs_path(pattern)
+    # Get the prefix with which we can list objects in the given bucket.
+    prefix = re.match('^[^[*?]*', name_pattern).group(0)
+    request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
+    file_sizes = {}
+    while True:
+      response = self.client.objects.List(request)
+      for item in response.items:
+        if fnmatch.fnmatch(item.name, name_pattern):
+          file_name = 'gs://%s/%s' % (item.bucket, item.name)
+          file_sizes[file_name] = item.size
+      if response.nextPageToken:
+        request.pageToken = response.nextPageToken
+      else:
+        break
+    return file_sizes
+
 
 class GcsBufferedReader(object):
   """A class for reading Google Cloud Storage files."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/7a059d37/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index 95c8e58..9d44e17 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -652,6 +652,44 @@ class TestGCSIO(unittest.TestCase):
       self.assertEqual(
           set(self.gcs.glob(file_pattern)), set(expected_file_names))
 
+  def test_size_of_files_in_glob(self):
+    bucket_name = 'gcsio-test'
+    object_names = [
+        ('cow/cat/fish', 2),
+        ('cow/cat/blubber', 3),
+        ('cow/dog/blubber', 4),
+        ('apple/dog/blubber', 5),
+        ('apple/fish/blubber', 6),
+        ('apple/fish/blowfish', 7),
+        ('apple/fish/bambi', 8),
+        ('apple/fish/balloon', 9),
+        ('apple/fish/cat', 10),
+        ('apple/fish/cart', 11),
+        ('apple/fish/carl', 12),
+        ('apple/dish/bat', 13),
+        ('apple/dish/cat', 14),
+        ('apple/dish/carl', 15),
+    ]
+    for (object_name, size) in object_names:
+      file_name = 'gs://%s/%s' % (bucket_name, object_name)
+      self._insert_random_file(self.client, file_name, size)
+    test_cases = [
+        ('gs://gcsio-test/cow/*', [
+            ('cow/cat/fish', 2),
+            ('cow/cat/blubber', 3),
+            ('cow/dog/blubber', 4),
+        ]),
+        ('gs://gcsio-test/apple/fish/car?', [
+            ('apple/fish/cart', 11),
+            ('apple/fish/carl', 12),
+        ])
+    ]
+    for file_pattern, expected_object_names in test_cases:
+      expected_file_sizes = {'gs://%s/%s' % (bucket_name, o): s
+                             for (o, s) in expected_object_names}
+      self.assertEqual(
+          self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes)
+
 
 class TestPipeStream(unittest.TestCase):
 


Mime
View raw message