beam-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From dhalp...@apache.org
Subject [1/2] incubator-beam git commit: Handle HttpError in GCS upload thread
Date Fri, 15 Jul 2016 01:15:47 GMT
Repository: incubator-beam
Updated Branches:
  refs/heads/python-sdk a1a51c3c1 -> d898d56ae


Handle HttpError in GCS upload thread

* break connection to the main thread and propagate the exception.
* Retry in auth _refresh() to guard against temporary errors in the
  metadata service.


Project: http://git-wip-us.apache.org/repos/asf/incubator-beam/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-beam/commit/a4267d26
Tree: http://git-wip-us.apache.org/repos/asf/incubator-beam/tree/a4267d26
Diff: http://git-wip-us.apache.org/repos/asf/incubator-beam/diff/a4267d26

Branch: refs/heads/python-sdk
Commit: a4267d264395706f12479aa876501a62d5b679b7
Parents: a1a51c3
Author: Ahmet Altay <altay@google.com>
Authored: Fri Jul 8 16:28:07 2016 -0700
Committer: Dan Halperin <dhalperi@google.com>
Committed: Thu Jul 14 18:15:34 2016 -0700

----------------------------------------------------------------------
 sdks/python/apache_beam/internal/auth.py |  2 ++
 sdks/python/apache_beam/io/gcsio.py      | 23 ++++++++++++++++++++---
 2 files changed, 22 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4267d26/sdks/python/apache_beam/internal/auth.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/internal/auth.py b/sdks/python/apache_beam/internal/auth.py
index 0081970..f324a2d 100644
--- a/sdks/python/apache_beam/internal/auth.py
+++ b/sdks/python/apache_beam/internal/auth.py
@@ -82,6 +82,8 @@ class GCEMetadataCredentials(OAuth2Credentials):
         None,  # token_uri
         user_agent)
 
+  @retry.with_exponential_backoff(
+      retry_filter=retry.retry_on_server_errors_and_timeout_filter)
   def _refresh(self, http_request):
     refresh_time = datetime.datetime.now()
     req = urllib2.Request('http://metadata.google.internal/computeMetadata/v1/'

http://git-wip-us.apache.org/repos/asf/incubator-beam/blob/a4267d26/sdks/python/apache_beam/io/gcsio.py
----------------------------------------------------------------------
diff --git a/sdks/python/apache_beam/io/gcsio.py b/sdks/python/apache_beam/io/gcsio.py
index a01988b..c61f251 100644
--- a/sdks/python/apache_beam/io/gcsio.py
+++ b/sdks/python/apache_beam/io/gcsio.py
@@ -533,6 +533,7 @@ class GcsBufferedWriter(object):
 
     # Set up communication with uploading thread.
     parent_conn, child_conn = multiprocessing.Pipe()
+    self.child_conn = child_conn
     self.conn = parent_conn
 
     # Set up uploader.
@@ -547,6 +548,7 @@ class GcsBufferedWriter(object):
     # Start uploading thread.
     self.upload_thread = threading.Thread(target=self._start_upload)
     self.upload_thread.daemon = True
+    self.upload_thread.last_error = None
     self.upload_thread.start()
 
   # TODO(silviuc): Refactor so that retry logic can be applied.
@@ -560,7 +562,15 @@ class GcsBufferedWriter(object):
     #
     # The uploader by default transfers data in chunks of 1024 * 1024 bytes at
     # a time, buffering writes until that size is reached.
-    self.client.objects.Insert(self.insert_request, upload=self.upload)
+    try:
+      self.client.objects.Insert(self.insert_request, upload=self.upload)
+    except HttpError as http_error:
+      logging.error(
+          'HTTP error while inserting file %s: %s', self.path, http_error)
+      self.upload_thread.last_error = http_error
+      raise
+    finally:
+      self.child_conn.close()
 
   def write(self, data):
     """Write data to a GCS file.
@@ -574,8 +584,14 @@ class GcsBufferedWriter(object):
     self._check_open()
     if not data:
       return
-    self.conn.send_bytes(data)
-    self.position += len(data)
+    try:
+      self.conn.send_bytes(data)
+      self.position += len(data)
+    except IOError:
+      if self.upload_thread.last_error:
+        raise self.upload_thread.last_error  # pylint: disable=raising-bad-type
+      else:
+        raise
 
   def tell(self):
     """Return the total number of bytes passed to write() so far."""
@@ -583,6 +599,7 @@ class GcsBufferedWriter(object):
 
   def close(self):
     """Close the current GCS file."""
+    self.closed = True
     self.conn.close()
     self.upload_thread.join()
 


Mime
View raw message