Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4B8CE200C33 for ; Sat, 4 Feb 2017 02:14:35 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 4A000160B55; Sat, 4 Feb 2017 01:14:35 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6C2CB160B3F for ; Sat, 4 Feb 2017 02:14:34 +0100 (CET) Received: (qmail 41744 invoked by uid 500); 4 Feb 2017 01:14:33 -0000 Mailing-List: contact commits-help@beam.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@beam.apache.org Delivered-To: mailing list commits@beam.apache.org Received: (qmail 41735 invoked by uid 99); 4 Feb 2017 01:14:33 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 04 Feb 2017 01:14:33 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 87502DFBE6; Sat, 4 Feb 2017 01:14:33 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: altay@apache.org To: commits@beam.apache.org Date: Sat, 04 Feb 2017 01:14:33 -0000 Message-Id: <1c0c744cf43c458eaab00a6f026b9c9a@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: [1/2] beam git commit: Retry GCS partial-file read when API call hangs archived-at: Sat, 04 Feb 2017 01:14:35 -0000 Repository: beam Updated Branches: refs/heads/master fbd69dcf7 -> 6e220bb37 Retry GCS partial-file read when API call hangs Project: http://git-wip-us.apache.org/repos/asf/beam/repo Commit: http://git-wip-us.apache.org/repos/asf/beam/commit/c2d8d71b Tree: http://git-wip-us.apache.org/repos/asf/beam/tree/c2d8d71b Diff: http://git-wip-us.apache.org/repos/asf/beam/diff/c2d8d71b Branch: refs/heads/master Commit: c2d8d71b2d3c50779bef56bd92160aa9c9690ad5 Parents: fbd69dc Author: Charles Chen Authored: Fri Feb 3 13:14:45 2017 -0800 Committer: Ahmet Altay Committed: Fri Feb 3 17:02:15 2017 -0800 ---------------------------------------------------------------------- sdks/python/apache_beam/io/gcsio.py | 71 +++++++++++++++++++++++---- sdks/python/apache_beam/io/gcsio_test.py | 38 ++++++++++++++ 2 files changed, 99 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/beam/blob/c2d8d71b/sdks/python/apache_beam/io/gcsio.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py index 0d18cc0..f29776b 100644 --- a/sdks/python/apache_beam/io/gcsio.py +++ b/sdks/python/apache_beam/io/gcsio.py @@ -26,6 +26,7 @@ import fnmatch import logging import multiprocessing import os +import Queue import re import threading import traceback @@ -63,6 +64,10 @@ except ImportError: # +---------------+------------+-------------+-------------+-------------+ DEFAULT_READ_BUFFER_SIZE = 16 * 1024 * 1024 +# This is the number of seconds the library will wait for a partial-file read +# operation from GCS to complete before retrying. +DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS = 60 + # This is the size of chunks used when writing to GCS. WRITE_CHUNK_SIZE = 8 * 1024 * 1024 @@ -393,18 +398,20 @@ class GcsBufferedReader(object): client, path, mode='r', - buffer_size=DEFAULT_READ_BUFFER_SIZE): + buffer_size=DEFAULT_READ_BUFFER_SIZE, + segment_timeout=DEFAULT_READ_SEGMENT_TIMEOUT_SECONDS): self.client = client self.path = path self.bucket, self.name = parse_gcs_path(path) self.mode = mode self.buffer_size = buffer_size + self.segment_timeout = segment_timeout # Get object state. - get_request = (storage.StorageObjectsGetRequest( + self.get_request = (storage.StorageObjectsGetRequest( bucket=self.bucket, object=self.name)) try: - metadata = self._get_object_metadata(get_request) + metadata = self._get_object_metadata(self.get_request) except HttpError as http_error: if http_error.status_code == 404: raise IOError(errno.ENOENT, 'Not found: %s' % self.path) @@ -415,13 +422,13 @@ class GcsBufferedReader(object): self.size = metadata.size # Ensure read is from file of the correct generation. - get_request.generation = metadata.generation + self.get_request.generation = metadata.generation # Initialize read buffer state. self.download_stream = cStringIO.StringIO() self.downloader = transfer.Download( - self.download_stream, auto_transfer=False, chunksize=buffer_size) - self.client.objects.Get(get_request, download=self.downloader) + self.download_stream, auto_transfer=False, chunksize=self.buffer_size) + self.client.objects.Get(self.get_request, download=self.downloader) self.position = 0 self.buffer = '' self.buffer_start_position = 0 @@ -539,7 +546,47 @@ class GcsBufferedReader(object): self.buffer_start_position + len(self.buffer) <= self.position): bytes_to_request = min(self._remaining(), self.buffer_size) self.buffer_start_position = self.position - self.buffer = self._get_segment(self.position, bytes_to_request) + retry_count = 0 + while retry_count <= 10: + queue = Queue.Queue() + t = threading.Thread(target=self._fetch_to_queue, + args=(queue, self._get_segment, + (self.position, bytes_to_request))) + t.daemon = True + t.start() + try: + result, exn, tb = queue.get(timeout=self.segment_timeout) + except Queue.Empty: + logging.warning( + ('Timed out fetching %d bytes from position %d of %s after %f ' + 'seconds; retrying...'), bytes_to_request, self.position, + self.path, self.segment_timeout) + retry_count += 1 + # Reinitialize download objects. + self.download_stream = cStringIO.StringIO() + self.downloader = transfer.Download( + self.download_stream, auto_transfer=False, + chunksize=self.buffer_size) + self.client.objects.Get(self.get_request, download=self.downloader) + continue + if exn: + logging.error( + ('Exception while fetching %d bytes from position %d of %s: ' + '%s\n%s'), + bytes_to_request, self.position, self.path, exn, tb) + raise exn + self.buffer = result + return + raise GcsIOError( + 'Reached retry limit for _fetch_next_if_buffer_exhausted.') + + def _fetch_to_queue(self, queue, func, args): + try: + value = func(*args) + queue.put((value, None, None)) + except Exception as e: # pylint: disable=broad-except + tb = traceback.format_exc() + queue.put((None, e, tb)) def _remaining(self): return self.size - self.position @@ -555,11 +602,15 @@ class GcsBufferedReader(object): """Get the given segment of the current GCS file.""" if size == 0: return '' + # The objects self.downloader and self.download_stream may be recreated if + # this call times out; we save them locally to avoid any threading issues. + downloader = self.downloader + download_stream = self.download_stream end = start + size - 1 - self.downloader.GetRange(start, end) - value = self.download_stream.getvalue() + downloader.GetRange(start, end) + value = download_stream.getvalue() # Clear the cStringIO object after we've read its contents. - self.download_stream.truncate(0) + download_stream.truncate(0) assert len(value) == size return value http://git-wip-us.apache.org/repos/asf/beam/blob/c2d8d71b/sdks/python/apache_beam/io/gcsio_test.py ---------------------------------------------------------------------- diff --git a/sdks/python/apache_beam/io/gcsio_test.py b/sdks/python/apache_beam/io/gcsio_test.py index bd7eb51..6532707 100644 --- a/sdks/python/apache_beam/io/gcsio_test.py +++ b/sdks/python/apache_beam/io/gcsio_test.py @@ -22,6 +22,7 @@ import multiprocessing import os import random import threading +import time import unittest import httplib2 @@ -427,6 +428,43 @@ class TestGCSIO(unittest.TestCase): f.seek(0) self.assertEqual(f.read(), random_file.contents) + def test_flaky_file_read(self): + file_name = 'gs://gcsio-test/flaky_file' + file_size = 5 * 1024 * 1024 + 100 + random_file = self._insert_random_file(self.client, file_name, file_size) + f = self.gcs.open(file_name) + random.seed(0) + f.buffer_size = 1024 * 1024 + f.segment_timeout = 0.1 + self.assertEqual(f.mode, 'r') + f._real_get_segment = f._get_segment + + def flaky_get_segment(start, size): + if random.randint(0, 3) == 1: + time.sleep(600) + return f._real_get_segment(start, size) + + f._get_segment = flaky_get_segment + self.assertEqual(f.read(), random_file.contents) + + # Test exception handling in file read. + def failing_get_segment(unused_start, unused_size): + raise IOError('Could not read.') + + f._get_segment = failing_get_segment + f.seek(0) + with self.assertRaises(IOError): + f.read() + + # Test retry limit in hanging file read. + def hanging_get_segment(unused_start, unused_size): + time.sleep(600) + + f._get_segment = hanging_get_segment + f.seek(0) + with self.assertRaises(gcsio.GcsIOError): + f.read() + def test_file_random_seek(self): file_name = 'gs://gcsio-test/seek_file' file_size = 5 * 1024 * 1024 - 100