beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From chamik...@apache.org
Subject [1/2] beam git commit: [BEAM-1892] File size estimation process reporting
Date Thu, 06 Apr 2017 18:32:46 GMT
Repository: beam
Updated Branches:
  refs/heads/master f87597e10 -> 37e4cc1b8


[BEAM-1892] File size estimation process reporting


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/378b3f5b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/378b3f5b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/378b3f5b

Branch: refs/heads/master
Commit: 378b3f5bf9886cc390e674a3e600a22ad2e5cb98
Parents: f87597e
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Wed Apr 5 16:49:05 2017 -0700
Committer: Chamikara Jayalath <chamikara@google.com>
Committed: Thu Apr 6 11:29:10 2017 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py   |  1 +
 sdks/python/apache_beam/io/gcp/gcsfilesystem.py |  2 +-
 .../apache_beam/io/gcp/gcsfilesystem_test.py    | 29 ++++++++++++++-
 sdks/python/apache_beam/io/gcp/gcsio.py         | 16 +++++++-
 sdks/python/apache_beam/io/gcp/gcsio_test.py    | 39 ++++++++++++++++++++
 5 files changed, 83 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 930d958..2e7043f 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -35,6 +35,7 @@ from apache_beam.transforms.display import DisplayDataItem
 from apache_beam.utils.value_provider import ValueProvider
 from apache_beam.utils.value_provider import StaticValueProvider
 from apache_beam.utils.value_provider import check_accessible
+
 MAX_NUM_THREADS_FOR_SIZE_ESTIMATION = 25
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
index 5aef0ab..d79630f 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem.py
@@ -65,7 +65,7 @@ class GCSFileSystem(FileSystem):
       """
       if pattern.endswith('/'):
         pattern += '*'
-      file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern)
+      file_sizes = gcsio.GcsIO().size_of_files_in_glob(pattern, limit)
       metadata_list = [FileMetadata(path, size)
                        for path, size in file_sizes.iteritems()]
       return MatchResult(pattern, metadata_list)

http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
index 73a3893..5a1f10d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsfilesystem_test.py
@@ -54,7 +54,31 @@ class GCSFileSystemTest(unittest.TestCase):
     self.assertEqual(
         set(match_result.metadata_list),
         expected_results)
-    gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+    gcsio_mock.size_of_files_in_glob.assert_called_once_with(
+        'gs://bucket/*', None)
+
+  @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
+  def test_match_multiples_limit(self, mock_gcsio):
+    # Prepare mocks.
+    gcsio_mock = mock.MagicMock()
+    limit = 1
+    gcsfilesystem.gcsio.GcsIO = lambda: gcsio_mock
+    gcsio_mock.size_of_files_in_glob.return_value = {
+        'gs://bucket/file1': 1
+    }
+    expected_results = set([
+        FileMetadata('gs://bucket/file1', 1)
+    ])
+    file_system = gcsfilesystem.GCSFileSystem()
+    match_result = file_system.match(['gs://bucket/'], [limit])[0]
+    self.assertEqual(
+        set(match_result.metadata_list),
+        expected_results)
+    self.assertEqual(
+        len(match_result.metadata_list),
+        limit)
+    gcsio_mock.size_of_files_in_glob.assert_called_once_with(
+        'gs://bucket/*', 1)
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiples_error(self, mock_gcsio):
@@ -71,7 +95,8 @@ class GCSFileSystemTest(unittest.TestCase):
     self.assertTrue(
         error.exception.message.startswith('Match operation failed'))
     self.assertEqual(error.exception.exception_details, expected_results)
-    gcsio_mock.size_of_files_in_glob.assert_called_once_with('gs://bucket/*')
+    gcsio_mock.size_of_files_in_glob.assert_called_once_with(
+        'gs://bucket/*', None)
 
   @mock.patch('apache_beam.io.gcp.gcsfilesystem.gcsio')
   def test_match_multiple_patterns(self, mock_gcsio):

http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio.py b/sdks/python/apache_beam/io/gcp/gcsio.py
index 0a10094..c76c99d 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio.py
@@ -29,6 +29,7 @@ import os
 import Queue
 import re
 import threading
+import time
 import traceback
 
 from apache_beam.utils import retry
@@ -368,7 +369,7 @@ class GcsIO(object):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def size_of_files_in_glob(self, pattern):
+  def size_of_files_in_glob(self, pattern, limit=None):
     """Returns the size of all the files in the glob as a dictionary
 
     Args:
@@ -379,16 +380,29 @@ class GcsIO(object):
     prefix = re.match('^[^[*?]*', name_pattern).group(0)
     request = storage.StorageObjectsListRequest(bucket=bucket, prefix=prefix)
     file_sizes = {}
+    counter = 0
+    start_time = time.time()
+    logging.info("Starting the size estimation of the input")
     while True:
       response = self.client.objects.List(request)
       for item in response.items:
         if fnmatch.fnmatch(item.name, name_pattern):
           file_name = 'gs://%s/%s' % (item.bucket, item.name)
           file_sizes[file_name] = item.size
+        counter += 1
+        if limit is not None and counter >= limit:
+          break
+        if counter % 10000 == 0:
+          logging.info("Finished computing size of: %s files", len(file_sizes))
       if response.nextPageToken:
         request.pageToken = response.nextPageToken
+        if limit is not None and len(file_sizes) >= limit:
+          break
       else:
         break
+    logging.info(
+        "Finished the size estimation of the input at %s files. " +\
+        "Estimation took %s seconds", counter, time.time() - start_time)
     return file_sizes
 
 

http://git-wip-us.apache.org/repos/asf/beam/blob/378b3f5b/sdks/python/apache_beam/io/gcp/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcp/gcsio_test.py b/sdks/python/apache_beam/io/gcp/gcsio_test.py
index c028f0d..73d2213 100644
--- a/sdks/python/apache_beam/io/gcp/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcp/gcsio_test.py
@@ -747,6 +747,45 @@ class TestGCSIO(unittest.TestCase):
       self.assertEqual(
           self.gcs.size_of_files_in_glob(file_pattern), expected_file_sizes)
 
+  def test_size_of_files_in_glob_limited(self):
+    bucket_name = 'gcsio-test'
+    object_names = [
+        ('cow/cat/fish', 2),
+        ('cow/cat/blubber', 3),
+        ('cow/dog/blubber', 4),
+        ('apple/dog/blubber', 5),
+        ('apple/fish/blubber', 6),
+        ('apple/fish/blowfish', 7),
+        ('apple/fish/bambi', 8),
+        ('apple/fish/balloon', 9),
+        ('apple/fish/cat', 10),
+        ('apple/fish/cart', 11),
+        ('apple/fish/carl', 12),
+        ('apple/dish/bat', 13),
+        ('apple/dish/cat', 14),
+        ('apple/dish/carl', 15),
+    ]
+    for (object_name, size) in object_names:
+      file_name = 'gs://%s/%s' % (bucket_name, object_name)
+      self._insert_random_file(self.client, file_name, size)
+    test_cases = [
+        ('gs://gcsio-test/cow/*', [
+            ('cow/cat/fish', 2),
+            ('cow/cat/blubber', 3),
+            ('cow/dog/blubber', 4),
+        ]),
+        ('gs://gcsio-test/apple/fish/car?', [
+            ('apple/fish/cart', 11),
+            ('apple/fish/carl', 12),
+        ])
+    ]
+    # Check if limits are followed correctly
+    limit = 1
+    for file_pattern, expected_object_names in test_cases:
+      expected_num_items = min(len(expected_object_names), limit)
+      self.assertEqual(
+          len(self.gcs.glob(file_pattern, limit)), expected_num_items)
+
 
 @unittest.skipIf(HttpError is None, 'GCP dependencies are not installed')
 class TestPipeStream(unittest.TestCase):


Mime
View raw message