arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject arrow git commit: ARROW-404: [Python] Fix segfault caused by HdfsClient getting closed before an HdfsFile
Date Fri, 09 Dec 2016 05:50:09 GMT
Repository: arrow
Updated Branches:
  refs/heads/master c8eb49e41 -> e139b8b7c


ARROW-404: [Python] Fix segfault caused by HdfsClient getting closed before an HdfsFile

The one downside of this patch is that HdfsFile handles don't get garbage-collected until
the cyclic GC runs -- I tried to fix this but couldn't get it working. So bytes don't always
get flushed to HDFS until `close()` is called. The flush issue should be addressed on the
C++ side

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #230 from wesm/ARROW-404 and squashes the following commits:

3a8e641 [Wes McKinney] Use weakref in _HdfsFileNanny to avoid cyclic gc
274d0c5 [Wes McKinney] amend comment
1539a2c [Wes McKinney] Ensure that HdfsClient does not get closed before an open file does
when the last user-accessible client reference goes out of scope


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e139b8b7
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e139b8b7
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e139b8b7

Branch: refs/heads/master
Commit: e139b8b7c11b7f36fa57a625a39d9c8779d033f4
Parents: c8eb49e
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Fri Dec 9 06:49:49 2016 +0100
Committer: Uwe L. Korn <uwelk@xhochy.com>
Committed: Fri Dec 9 06:49:49 2016 +0100

----------------------------------------------------------------------
 python/pyarrow/io.pyx             | 86 ++++++++++++++++++++++------------
 python/pyarrow/tests/test_hdfs.py | 23 +++++++++
 2 files changed, 79 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/e139b8b7/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 0e6b81e..2fa5fb6 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -504,7 +504,7 @@ cdef class HdfsClient:
 
         out.mode = mode
         out.buffer_size = c_buffer_size
-        out.parent = self
+        out.parent = _HdfsFileNanny(self, out)
         out.is_open = True
         out.own_file = True
 
@@ -516,48 +516,69 @@ cdef class HdfsClient:
         """
         write_queue = Queue(50)
 
-        f = self.open(path, 'wb')
+        with self.open(path, 'wb') as f:
+            done = False
+            exc_info = None
+            def bg_write():
+                try:
+                    while not done or write_queue.qsize() > 0:
+                        try:
+                            buf = write_queue.get(timeout=0.01)
+                        except QueueEmpty:
+                            continue
 
-        done = False
-        exc_info = None
-        def bg_write():
-            try:
-                while not done or write_queue.qsize() > 0:
-                    try:
-                        buf = write_queue.get(timeout=0.01)
-                    except QueueEmpty:
-                        continue
+                        f.write(buf)
 
-                    f.write(buf)
+                except Exception as e:
+                    exc_info = sys.exc_info()
 
-            except Exception as e:
-                exc_info = sys.exc_info()
-
-        writer_thread = threading.Thread(target=bg_write)
-        writer_thread.start()
+            writer_thread = threading.Thread(target=bg_write)
+            writer_thread.start()
 
-        try:
-            while True:
-                buf = stream.read(buffer_size)
-                if not buf:
-                    break
+            try:
+                while True:
+                    buf = stream.read(buffer_size)
+                    if not buf:
+                        break
 
-                write_queue.put_nowait(buf)
-        finally:
-            done = True
+                    write_queue.put_nowait(buf)
+            finally:
+                done = True
 
-        writer_thread.join()
-        if exc_info is not None:
-            raise exc_info[0], exc_info[1], exc_info[2]
+            writer_thread.join()
+            if exc_info is not None:
+                raise exc_info[0], exc_info[1], exc_info[2]
 
     def download(self, path, stream, buffer_size=None):
-        f = self.open(path, 'rb', buffer_size=buffer_size)
-        f.download(stream)
+        with self.open(path, 'rb', buffer_size=buffer_size) as f:
+            f.download(stream)
 
 
 # ----------------------------------------------------------------------
 # Specialization for HDFS
 
+# ARROW-404: Helper class to ensure that files are closed before the
+# client. During deallocation of the extension class, the attributes are
+# decref'd which can cause the client to get closed first if the file has the
+# last remaining reference
+cdef class _HdfsFileNanny:
+    cdef:
+        object client
+        object file_handle_ref
+
+    def __cinit__(self, client, file_handle):
+        import weakref
+        self.client = client
+        self.file_handle_ref = weakref.ref(file_handle)
+
+    def __dealloc__(self):
+        fh = self.file_handle_ref()
+        if fh:
+            fh.close()
+        # avoid cyclic GC
+        self.file_handle_ref = None
+        self.client = None
+
 
 cdef class HdfsFile(NativeFile):
     cdef readonly:
@@ -565,6 +586,11 @@ cdef class HdfsFile(NativeFile):
         object mode
         object parent
 
+    cdef object __weakref__
+
+    def __dealloc__(self):
+        self.parent = None
+
     def read(self, int nbytes):
         """
         Read indicated number of bytes from the file, up to EOF

http://git-wip-us.apache.org/repos/asf/arrow/blob/e139b8b7/python/pyarrow/tests/test_hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py
index ed8d419..c23543b 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -98,6 +98,29 @@ def test_hdfs_ls(hdfs):
     assert contents == [dir_path, f1_path]
 
 
+def _make_test_file(hdfs, test_name, test_path, test_data):
+    base_path = pjoin(HDFS_TMP_PATH, test_name)
+    hdfs.mkdir(base_path)
+
+    full_path = pjoin(base_path, test_path)
+
+    f = hdfs.open(full_path, 'wb')
+    f.write(test_data)
+
+    return full_path
+
+
+@libhdfs
+def test_hdfs_orphaned_file():
+    hdfs = hdfs_test_client()
+    file_path = _make_test_file(hdfs, 'orphaned_file_test', 'fname',
+                                'foobarbaz')
+
+    f = hdfs.open(file_path)
+    hdfs = None
+    f = None  # noqa
+
+
 @libhdfs
 def test_hdfs_download_upload(hdfs):
     base_path = pjoin(HDFS_TMP_PATH, 'upload-test')


Mime
View raw message