arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [arrow] branch master updated: ARROW-1555 [Python] Implement Dask exists function
Date Fri, 27 Oct 2017 02:58:57 GMT
This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 4db0046  ARROW-1555 [Python] Implement Dask exists function
4db0046 is described below

commit 4db0046af18bfe90ecec5510e1962e81b8eccf6b
Author: Benjamin Goldberg <ben@spothero.com>
AuthorDate: Thu Oct 26 22:55:18 2017 -0400

    ARROW-1555 [Python] Implement Dask exists function
    
    Hi Maintainers,
    
    This is a tiny patch to add the `exists` implementation for the Dask filesystem closing
[ARROW-1555](https://issues.apache.org/jira/browse/ARROW-1555).
    
    I ran into an issue attempting to use the new `S3FSWrapper` with `s3fs` to upload a parquet
file to S3. The following pseudo-code raises a `NotImplemented` error because the parquet
implementation makes a call to check if the directory exists on the fs first. Because there
is no implementation for `DaskFilesystem`, it defaults to the base classes' `exists` which
raises a `NotImplemented` error.
    
    ```
    import pandas as pd
    import pyarrow as pa
    import pyarrow.parquet as pq
    
    df = pd.DataFrame(<some_data>)
    pa_table = pa.Table.from_pandas(df)
    dst_path = s3://<bucket>/<path>.parq.snappy
    s3_fs = pa.filesystem.S3FSWrapper(fs=s3fs.S3FileSystem())
    pq.write_to_dataset(pa_table, dst_path, filesystem=s3_fs)
    ```
    
    Author: Benjamin Goldberg <ben@spothero.com>
    Author: Wes McKinney <wes.mckinney@twosigma.com>
    
    Closes #1240 from benjigoldberg/feature/dask-fs-exists-impl and squashes the following
commits:
    
    cf9946c9 [Wes McKinney] Fix flakes
    bd9f2473 [Benjamin Goldberg] make filestore check private
    5d9a1614 [Benjamin Goldberg] Reverse order of fs type checks
    d285a84f [Benjamin Goldberg] Add an attribute on FileSystem to indicate whether its a
file store or object store
    51d2cbfa [Benjamin Goldberg] Reference the fs instead of os
    649f572a [Benjamin Goldberg] Implement Dask exists function
---
 python/pyarrow/filesystem.py         | 23 +++++++++++++++++++++++
 python/pyarrow/parquet.py            |  4 ++--
 python/pyarrow/tests/test_parquet.py |  1 +
 3 files changed, 26 insertions(+), 2 deletions(-)

diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
index 8d2d8fc..926df0e 100644
--- a/python/pyarrow/filesystem.py
+++ b/python/pyarrow/filesystem.py
@@ -135,6 +135,13 @@ class FileSystem(object):
         """
         raise NotImplementedError
 
+    def _isfilestore(self):
+        """
+        Returns True if this FileSystem is a unix-style file store with
+        directories.
+        """
+        raise NotImplementedError
+
     def read_parquet(self, path, columns=None, metadata=None, schema=None,
                      nthreads=1, use_pandas_metadata=False):
         """
@@ -209,6 +216,10 @@ class LocalFileSystem(FileSystem):
     def isfile(self, path):
         return os.path.isfile(path)
 
+    @implements(FileSystem._isfilestore)
+    def _isfilestore(self):
+        return True
+
     @implements(FileSystem.exists)
     def exists(self, path):
         return os.path.exists(path)
@@ -247,10 +258,22 @@ class DaskFileSystem(FileSystem):
     def isfile(self, path):
         raise NotImplementedError("Unsupported file system API")
 
+    @implements(FileSystem._isfilestore)
+    def _isfilestore(self):
+        """
+        Object Stores like S3 and GCSFS are based on key lookups, not true
+        file-paths
+        """
+        return False
+
     @implements(FileSystem.delete)
     def delete(self, path, recursive=False):
         return self.fs.rm(path, recursive=recursive)
 
+    @implements(FileSystem.exists)
+    def exists(self, path):
+        return self.fs.exists(path)
+
     @implements(FileSystem.mkdir)
     def mkdir(self, path):
         return self.fs.mkdir(path)
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index 0a40f5f..9dcc30c 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -985,7 +985,7 @@ def write_to_dataset(table, root_path, partition_cols=None,
     else:
         fs = _ensure_filesystem(filesystem)
 
-    if not fs.exists(root_path):
+    if fs._isfilestore() and not fs.exists(root_path):
         fs.mkdir(root_path)
 
     if partition_cols is not None and len(partition_cols) > 0:
@@ -1004,7 +1004,7 @@ def write_to_dataset(table, root_path, partition_cols=None,
             subtable = Table.from_pandas(subgroup,
                                          preserve_index=preserve_index)
             prefix = "/".join([root_path, subdir])
-            if not fs.exists(prefix):
+            if fs._isfilestore() and not fs.exists(prefix):
                 fs.mkdir(prefix)
             outfile = compat.guid() + ".parquet"
             full_path = "/".join([prefix, outfile])
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index 09184cc..a7fe98c 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -978,6 +978,7 @@ def _generate_partition_directories(fs, base_dir, partition_spec, df):
                 part_table = pa.Table.from_pandas(filtered_df)
                 with fs.open(file_path, 'wb') as f:
                     _write_table(part_table, f)
+                assert fs.exists(file_path)
             else:
                 _visit_level(level_dir, level + 1, this_part_keys)
 

-- 
To stop receiving notification emails like this one, please contact
['"commits@arrow.apache.org" <commits@arrow.apache.org>'].

Mime
View raw message