beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From al...@apache.org
Subject [1/2] beam git commit: Retry GCS partial-file read when API call hangs
Date Sat, 04 Feb 2017 01:14:33 GMT
Repository: beam
Updated Branches:
  refs/heads/master fbd69dcf7 -> 6e220bb37


Retry GCS partial-file read when API call hangs


Project: http://git-wip-us.apache.org/repos/asf/beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2d8d71b
Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2d8d71b
Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2d8d71b

Branch: refs/heads/master
Commit: c2d8d71b2d3c50779bef56bd92160aa9c9690ad5
Parents: fbd69dc
Author: Charles Chen <ccy@google.com>
Authored: Fri Feb 3 13:14:45 2017 -0800
Committer: Ahmet Altay <altay@google.com>
Committed: Fri Feb 3 17:02:15 2017 -0800

----------------------------------------------------------------------
 sdks/python/apache_beam/io/gcsio.py      | 71 +++++++++++++++++++++++----
 sdks/python/apache_beam/io/gcsio_test.py | 38 ++++++++++++++
 2 files changed, 99 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/beam/blob/c2d8d71b/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index 0d18cc0..f29776b 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -26,6 +26,7 @@ import fnmatch
 import logging
 import multiprocessing
 import os
+import Queue
 import re
 import threading
 import traceback
@@ -63,6 +64,10 @@ except ImportError:
 # +---------------+------------+-------------+-------------+-------------+
 DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024
 
+# This is the number of seconds the library will wait for a partial-file read
+# operation from GCS to complete before retrying.
+DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60
+
 # This is the size of chunks used when writing to GCS.
 WRITE_CHUNK_SIZE = 8 * 1024 * 1024
 
@@ -393,18 +398,20 @@ class GcsBufferedReader(object):
                client,
                path,
                mode='r',
-               buffer_size=DEFAULT_READ_BUFFER_SIZE):
+               buffer_size=DEFAULT_READ_BUFFER_SIZE,
+               segment_timeout=DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS):
     self.client = client
     self.path = path
     self.bucket, self.name = parse_gcs_path(path)
     self.mode = mode
     self.buffer_size = buffer_size
+    self.segment_timeout = segment_timeout
 
     # Get object state.
-    get_request = (storage.StorageObjectsGetRequest(
+    self.get_request = (storage.StorageObjectsGetRequest(
         bucket=self.bucket, object=self.name))
     try:
-      metadata = self._get_object_metadata(get_request)
+      metadata = self._get_object_metadata(self.get_request)
     except HttpError as http_error:
       if http_error.status_code == 404:
         raise IOError(errno.ENOENT, 'Not found: %s' % self.path)
@@ -415,13 +422,13 @@ class GcsBufferedReader(object):
     self.size = metadata.size
 
     # Ensure read is from file of the correct generation.
-    get_request.generation = metadata.generation
+    self.get_request.generation = metadata.generation
 
     # Initialize read buffer state.
     self.download_stream = cStringIO.StringIO()
     self.downloader = transfer.Download(
-        self.download_stream, auto_transfer=False, chunksize=buffer_size)
-    self.client.objects.Get(get_request, download=self.downloader)
+        self.download_stream, auto_transfer=False, chunksize=self.buffer_size)
+    self.client.objects.Get(self.get_request, download=self.downloader)
     self.position = 0
     self.buffer = ''
     self.buffer_start_position = 0
@@ -539,7 +546,47 @@ class GcsBufferedReader(object):
         self.buffer_start_position + len(self.buffer) <= self.position):
       bytes_to_request = min(self._remaining(), self.buffer_size)
       self.buffer_start_position = self.position
-      self.buffer = self._get_segment(self.position, bytes_to_request)
+      retry_count = 0
+      while retry_count <= 10:
+        queue = Queue.Queue()
+        t = threading.Thread(target=self._fetch_to_queue,
+                             args=(queue, self._get_segment,
+                                   (self.position, bytes_to_request)))
+        t.daemon = True
+        t.start()
+        try:
+          result, exn, tb = queue.get(timeout=self.segment_timeout)
+        except Queue.Empty:
+          logging.warning(
+              ('Timed out fetching %d bytes from position %d of %s after %f '
+               'seconds; retrying...'), bytes_to_request, self.position,
+              self.path, self.segment_timeout)
+          retry_count += 1
+          # Reinitialize download objects.
+          self.download_stream = cStringIO.StringIO()
+          self.downloader = transfer.Download(
+              self.download_stream, auto_transfer=False,
+              chunksize=self.buffer_size)
+          self.client.objects.Get(self.get_request, download=self.downloader)
+          continue
+        if exn:
+          logging.error(
+              ('Exception while fetching %d bytes from position %d of %s: '
+               '%s\n%s'),
+              bytes_to_request, self.position, self.path, exn, tb)
+          raise exn
+        self.buffer = result
+        return
+      raise GcsIOError(
+          'Reached retry limit for _fetch_next_if_buffer_exhausted.')
+
+  def _fetch_to_queue(self, queue, func, args):
+    try:
+      value = func(*args)
+      queue.put((value, None, None))
+    except Exception as e:  # pylint: disable=broad-except
+      tb = traceback.format_exc()
+      queue.put((None, e, tb))
 
   def _remaining(self):
     return self.size - self.position
@@ -555,11 +602,15 @@ class GcsBufferedReader(object):
     """Get the given segment of the current GCS file."""
     if size == 0:
       return ''
+    # The objects self.downloader and self.download_stream may be recreated if
+    # this call times out; we save them locally to avoid any threading issues.
+    downloader = self.downloader
+    download_stream = self.download_stream
     end = start + size - 1
-    self.downloader.GetRange(start, end)
-    value = self.download_stream.getvalue()
+    downloader.GetRange(start, end)
+    value = download_stream.getvalue()
     # Clear the cStringIO object after we've read its contents.
-    self.download_stream.truncate(0)
+    download_stream.truncate(0)
     assert len(value) == size
     return value
 

http://git-wip-us.apache.org/repos/asf/beam/blob/c2d8d71b/sdks/python/apache_beam/io/gcsio_test.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py
index bd7eb51..6532707 100644
--- a/sdks/python/apache_beam/io/gcsio_test.py
+++ b/sdks/python/apache_beam/io/gcsio_test.py
@@ -22,6 +22,7 @@ import multiprocessing
 import os
 import random
 import threading
+import time
 import unittest
 
 import httplib2
@@ -427,6 +428,43 @@ class TestGCSIO(unittest.TestCase):
     f.seek(0)
     self.assertEqual(f.read(), random_file.contents)
 
+  def test_flaky_file_read(self):
+    file_name = 'gs://gcsio-test/flaky_file'
+    file_size = 5 * 1024 * 1024 + 100
+    random_file = self._insert_random_file(self.client, file_name, file_size)
+    f = self.gcs.open(file_name)
+    random.seed(0)
+    f.buffer_size = 1024 * 1024
+    f.segment_timeout = 0.1
+    self.assertEqual(f.mode, 'r')
+    f._real_get_segment = f._get_segment
+
+    def flaky_get_segment(start, size):
+      if random.randint(0, 3) == 1:
+        time.sleep(600)
+      return f._real_get_segment(start, size)
+
+    f._get_segment = flaky_get_segment
+    self.assertEqual(f.read(), random_file.contents)
+
+    # Test exception handling in file read.
+    def failing_get_segment(unused_start, unused_size):
+      raise IOError('Could not read.')
+
+    f._get_segment = failing_get_segment
+    f.seek(0)
+    with self.assertRaises(IOError):
+      f.read()
+
+    # Test retry limit in hanging file read.
+    def hanging_get_segment(unused_start, unused_size):
+      time.sleep(600)
+
+    f._get_segment = hanging_get_segment
+    f.seek(0)
+    with self.assertRaises(gcsio.GcsIOError):
+      f.read()
+
   def test_file_random_seek(self):
     file_name = 'gs://gcsio-test/seek_file'
     file_size = 5 * 1024 * 1024 - 100


Mime
View raw message