beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From rober...@apache.org
Subject [2/2] incubator-beam git commit: Fixes several issues related to 'filebasedsource'.
Date Fri, 15 Jul 2016 17:23:39 GMT
Fixes several issues related to 'filebasedsource'.

Adds a method 'fileio.ChannelFactory.size_in_bytes()' that can be used to determine the size
of a single file.
Implements this method for 'ChannelFactory' implementations for GCS and local files.
Updates 'filebasedsource' to use this method when determining size of files.

Fixes a small bug in 'OffsetRangeTracker'.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/c9c31fdf
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/c9c31fdf
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/c9c31fdf

Branch: refs/heads/python-sdk
Commit: c9c31fdffbe2365a8dedd3154726ab1c01cfa889
Parents: d898d56
Author: Chamikara Jayalath <chamikara@apache.org>
Authored: Wed Jul 6 20:25:04 2016 -0700
Committer: Robert Bradshaw <robertwb@google.com>
Committed: Fri Jul 15 10:23:13 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/io/filebasedsource.py     |  9 +--------
 sdks/python/apache_beam/io/fileio.py              | 14 ++++++++++++++
 sdks/python/apache_beam/io/gcsio.py               | 15 +++++++++++++++
 sdks/python/apache_beam/io/gcsio_test.py          |  8 ++++++++
 sdks/python/apache_beam/io/range_trackers.py      |  3 ++-
 sdks/python/apache_beam/io/range_trackers_test.py |  3 +++
 6 files changed, 43 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/filebasedsource.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/filebasedsource.py b/sdks/python/apache_beam/io/filebasedsource.py
index c877e44..aa0820d 100644
--- a/sdks/python/apache_beam/io/filebasedsource.py
+++ b/sdks/python/apache_beam/io/filebasedsource.py
@@ -26,7 +26,6 @@ For an example implementation of ``FileBasedSource`` see ``avroio.AvroSource``.
 """
 
 from multiprocessing.pool import ThreadPool
-import os
 import range_trackers
 
 from apache_beam.io import fileio
@@ -131,13 +130,7 @@ class FileBasedSource(iobase.BoundedSource):
   def _estimate_sizes_in_parallel(file_names):
 
     def _calculate_size_of_file(file_name):
-      f = fileio.ChannelFactory.open(
-          file_name, 'rb', 'application/octet-stream')
-      try:
-        f.seek(0, os.SEEK_END)
-        return f.tell()
-      finally:
-        f.close()
+      return fileio.ChannelFactory.size_in_bytes(file_name)
 
     return ThreadPool(MAX_NUM_THREADS_FOR_SIZE_ESTIMATION).map(
         _calculate_size_of_file, file_names)

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/fileio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/fileio.py b/sdks/python/apache_beam/io/fileio.py
index 31b6a93..f532077 100644
--- a/sdks/python/apache_beam/io/fileio.py
+++ b/sdks/python/apache_beam/io/fileio.py
@@ -255,6 +255,20 @@ class ChannelFactory(object):
     else:
       return glob.glob(path)
 
+  @staticmethod
+  def size_in_bytes(path):
+    """Returns the size of a file in bytes.
+
+    Args:
+      path: a string that gives the path of a single file.
+    """
+    if path.startswith('gs://'):
+      # pylint: disable=wrong-import-order, wrong-import-position
+      from apache_beam.io import gcsio
+      return gcsio.GcsIO().size(path)
+    else:
+      return os.path.getsize(path)
+
 
 class _CompressionType(object):
   """Object representing single compression type."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index c61f251..10409c9 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -236,6 +236,21 @@ class GcsIO(object):
     except IOError:
       return False
 
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
+  def size(self, path):
+    """Returns the size of a single GCS object.
+
+    This method does not perform glob expansion. Hence the given path must be
+    for a single GCS object.
+
+    Returns: size of the GCS object in bytes.
+    """
+    bucket, object_path = parse_gcs_path(path)
+    request = storage.StorageObjectsGetRequest(bucket=bucket,
+                                               object=object_path)
+    return self.client.objects.Get(request).size
+
 
 class GcsBufferedReader(object):
   """A class for reading Google Cloud Storage files."""

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index eeabb1a..7b15ef3 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -189,6 +189,14 @@ class TestGCSIO(unittest.TestCase):
     self.client = FakeGcsClient()
     self.gcs = gcsio.GcsIO(self.client)
 
+  def test_size(self):
+    file_name = 'gs://gcsio-test/dummy_file'
+    file_size = 1234
+
+    self._insert_random_file(self.client, file_name, file_size)
+    self.assertTrue(self.gcs.exists(file_name))
+    self.assertEqual(1234, self.gcs.size(file_name))
+
   def test_delete(self):
     file_name = 'gs://gcsio-test/delete_me'
     file_size = 1024

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/range_trackers.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers.py b/sdks/python/apache_beam/io/range_trackers.py
index c3481de..a736162 100644
--- a/sdks/python/apache_beam/io/range_trackers.py
+++ b/sdks/python/apache_beam/io/range_trackers.py
@@ -114,6 +114,7 @@ class OffsetRangeTracker(iobase.RangeTracker):
       self._last_record_start = record_start
 
   def try_split(self, split_offset):
+    assert isinstance(split_offset, (int, long))
     with self._lock:
       if self._stop_offset == OffsetRangeTracker.OFFSET_INFINITY:
         logging.debug('refusing to split %r at %d: stop position unspecified',
@@ -163,7 +164,7 @@ class OffsetRangeTracker(iobase.RangeTracker):
       raise Exception(
           'get_position_for_fraction_consumed is not applicable for an '
           'unbounded range')
-    return (math.ceil(self.start_position() + fraction * (
+    return int(math.ceil(self.start_position() + fraction * (
         self.stop_position() - self.start_position())))
 
 

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/c9c31fdf/sdks/python/apache_beam/io/range_trackers_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/range_trackers_test.py b/sdks/python/apache_beam/io/range_trackers_test.py
index ceeccd5..77733d3 100644
--- a/sdks/python/apache_beam/io/range_trackers_test.py
+++ b/sdks/python/apache_beam/io/range_trackers_test.py
@@ -98,6 +98,9 @@ class OffsetRangeTrackerTest(unittest.TestCase):
   def test_get_position_for_fraction_dense(self):
     # Represents positions 3, 4, 5.
     tracker = range_trackers.OffsetRangeTracker(3, 6)
+
+    # Position must be an integer type.
+    self.assertTrue(isinstance(tracker.position_at_fraction(0.0), (int, long)))
     # [3, 3) represents 0.0 of [3, 6)
     self.assertEqual(3, tracker.position_at_fraction(0.0))
     # [3, 4) represents up to 1/3 of [3, 6)


Mime
View raw message