beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Do not need to list all files in GCS for validation. Add limit field to fileIO
Date Sat, 03 Dec 2016 00:13:35 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk 2363ee510 -> fd6a52c15


Do not need to list all files in GCS for validation. Add limit field to fileIO


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/16886904
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/16886904
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/16886904

Branch: refs/heads/python-sdk
Commit: 16886904df9fd1d3f92e1f7aabd134a28d6c1c00
Parents: 2363ee5
Author: Sourabh Bajaj <sourabhbajaj@google.com>
Authored: Fri Dec 2 13:56:42 2016 -0800
Committer: Sourabh Bajaj <sourabhbajaj@google.com>
Committed: Fri Dec 2 13:56:42 2016 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py | 3 ++-
 sdks/python/apache_beam/io/fileio.py          | 7 ++++---
 sdks/python/apache_beam/io/gcsio.py           | 6 ++++--
 sdks/python/apache_beam/io/gcsio_test.py      | 7 +++++++
 4 files changed, 17 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index 14c2b06..8921801 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -175,7 +175,8 @@ class FileBasedSource(iobase.BoundedSource):
   def _validate(self):
     """Validate if there are actual files in the specified glob pattern
     """
-    if len(fileio.ChannelFactory.glob(self._pattern)) <= 0:
+    # Limit the responses as we only want to check if something exists
+    if len(fileio.ChannelFactory.glob(self._pattern, limit=1)) <= 0:
       raise IOError(
           'No files found based on the file pattern %s' % self._pattern)
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index c71a730..82e7813 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -588,11 +588,12 @@ class ChannelFactory(object):
         raise IOError(err)
 
   @staticmethod
-  def glob(path):
+  def glob(path, limit=None):
     if path.startswith('gs://'):
-      return gcsio.GcsIO().glob(path)
+      return gcsio.GcsIO().glob(path, limit)
     else:
-      return glob.glob(path)
+      files = glob.glob(path)
+      return files[:limit]
 
   @staticmethod
   def size_in_bytes(path):

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index 9adb946..748465f 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -142,7 +142,7 @@ class GcsIO(object):
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)
-  def glob(self, pattern):
+  def glob(self, pattern, limit=None):
     """Return the GCS path names matching a given path name pattern.
 
     Path name patterns are those recognized by fnmatch.fnmatch().  The path
@@ -166,9 +166,11 @@ class GcsIO(object):
           object_paths.append('gs://%s/%s' % (item.bucket, item.name))
       if response.nextPageToken:
         request.pageToken = response.nextPageToken
+        if limit is not None and len(object_paths) >= limit:
+          break
       else:
         break
-    return object_paths
+    return object_paths[:limit]
 
   @retry.with_exponential_backoff(
       retry_filter=retry.retry_on_server_errors_and_timeout_filter)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/16886904/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index 9d44e17..5af13c6 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -652,6 +652,13 @@ class TestGCSIO(unittest.TestCase):
       self.assertEqual(
           set(self.gcs.glob(file_pattern)), set(expected_file_names))
 
+    # Check if limits are followed correctly
+    limit = 3
+    for file_pattern, expected_object_names in test_cases:
+      expected_num_items = min(len(expected_object_names), limit)
+      self.assertEqual(
+          len(self.gcs.glob(file_pattern, limit)), expected_num_items)
+
   def test_size_of_files_in_glob(self):
     bucket_name = 'gcsio-test'
     object_names = [


Mime
View raw message