Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id B9C08200D3B for ; Fri, 27 Oct 2017 04:59:01 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id B80E0160BF3; Fri, 27 Oct 2017 02:59:01 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 086EB1609E5 for ; Fri, 27 Oct 2017 04:59:00 +0200 (CEST) Received: (qmail 94243 invoked by uid 500); 27 Oct 2017 02:59:00 -0000 Mailing-List: contact commits-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list commits@arrow.apache.org Received: (qmail 94234 invoked by uid 99); 27 Oct 2017 02:59:00 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 27 Oct 2017 02:59:00 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 2D41081BE6; Fri, 27 Oct 2017 02:58:57 +0000 (UTC) Date: Fri, 27 Oct 2017 02:58:57 +0000 To: "commits@arrow.apache.org" Subject: [arrow] branch master updated: ARROW-1555 [Python] Implement Dask exists function MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <150907313723.1657.12211248263215449605@gitbox.apache.org> From: wesm@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: arrow X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: 2ed886ee707822b398cdb22f4d4c10116fa1d9f3 X-Git-Newrev: 4db0046af18bfe90ecec5510e1962e81b8eccf6b X-Git-Rev: 4db0046af18bfe90ecec5510e1962e81b8eccf6b X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated archived-at: Fri, 27 Oct 2017 02:59:01 -0000 This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/arrow.git The following commit(s) were added to refs/heads/master by this push: new 4db0046 ARROW-1555 [Python] Implement Dask exists function 4db0046 is described below commit 4db0046af18bfe90ecec5510e1962e81b8eccf6b Author: Benjamin Goldberg AuthorDate: Thu Oct 26 22:55:18 2017 -0400 ARROW-1555 [Python] Implement Dask exists function Hi Maintainers, This is a tiny patch to add the `exists` implementation for the Dask filesystem closing [ARROW-1555](https://issues.apache.org/jira/browse/ARROW-1555). I ran into an issue attempting to use the new `S3FSWrapper` with `s3fs` to upload a parquet file to S3. The following pseudo-code raises a `NotImplemented` error because the parquet implementation makes a call to check if the directory exists on the fs first. Because there is no implementation for `DaskFilesystem`, it defaults to the base classes' `exists` which raises a `NotImplemented` error. ``` import pandas as pd import pyarrow as pa import pyarrow.parquet as pq df = pd.DataFrame() pa_table = pa.Table.from_pandas(df) dst_path = s3:///.parq.snappy s3_fs = pa.filesystem.S3FSWrapper(fs=s3fs.S3FileSystem()) pq.write_to_dataset(pa_table, dst_path, filesystem=s3_fs) ``` Author: Benjamin Goldberg Author: Wes McKinney Closes #1240 from benjigoldberg/feature/dask-fs-exists-impl and squashes the following commits: cf9946c9 [Wes McKinney] Fix flakes bd9f2473 [Benjamin Goldberg] make filestore check private 5d9a1614 [Benjamin Goldberg] Reverse order of fs type checks d285a84f [Benjamin Goldberg] Add an attribute on FileSystem to indicate whether its a file store or object store 51d2cbfa [Benjamin Goldberg] Reference the fs instead of os 649f572a [Benjamin Goldberg] Implement Dask exists function --- python/pyarrow/filesystem.py | 23 +++++++++++++++++++++++ python/pyarrow/parquet.py | 4 ++-- python/pyarrow/tests/test_parquet.py | 1 + 3 files changed, 26 insertions(+), 2 deletions(-) diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py index 8d2d8fc..926df0e 100644 --- a/python/pyarrow/filesystem.py +++ b/python/pyarrow/filesystem.py @@ -135,6 +135,13 @@ class FileSystem(object): """ raise NotImplementedError + def _isfilestore(self): + """ + Returns True if this FileSystem is a unix-style file store with + directories. + """ + raise NotImplementedError + def read_parquet(self, path, columns=None, metadata=None, schema=None, nthreads=1, use_pandas_metadata=False): """ @@ -209,6 +216,10 @@ class LocalFileSystem(FileSystem): def isfile(self, path): return os.path.isfile(path) + @implements(FileSystem._isfilestore) + def _isfilestore(self): + return True + @implements(FileSystem.exists) def exists(self, path): return os.path.exists(path) @@ -247,10 +258,22 @@ class DaskFileSystem(FileSystem): def isfile(self, path): raise NotImplementedError("Unsupported file system API") + @implements(FileSystem._isfilestore) + def _isfilestore(self): + """ + Object Stores like S3 and GCSFS are based on key lookups, not true + file-paths + """ + return False + @implements(FileSystem.delete) def delete(self, path, recursive=False): return self.fs.rm(path, recursive=recursive) + @implements(FileSystem.exists) + def exists(self, path): + return self.fs.exists(path) + @implements(FileSystem.mkdir) def mkdir(self, path): return self.fs.mkdir(path) diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py index 0a40f5f..9dcc30c 100644 --- a/python/pyarrow/parquet.py +++ b/python/pyarrow/parquet.py @@ -985,7 +985,7 @@ def write_to_dataset(table, root_path, partition_cols=None, else: fs = _ensure_filesystem(filesystem) - if not fs.exists(root_path): + if fs._isfilestore() and not fs.exists(root_path): fs.mkdir(root_path) if partition_cols is not None and len(partition_cols) > 0: @@ -1004,7 +1004,7 @@ def write_to_dataset(table, root_path, partition_cols=None, subtable = Table.from_pandas(subgroup, preserve_index=preserve_index) prefix = "/".join([root_path, subdir]) - if not fs.exists(prefix): + if fs._isfilestore() and not fs.exists(prefix): fs.mkdir(prefix) outfile = compat.guid() + ".parquet" full_path = "/".join([prefix, outfile]) diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py index 09184cc..a7fe98c 100644 --- a/python/pyarrow/tests/test_parquet.py +++ b/python/pyarrow/tests/test_parquet.py @@ -978,6 +978,7 @@ def _generate_partition_directories(fs, base_dir, partition_spec, df): part_table = pa.Table.from_pandas(filtered_df) with fs.open(file_path, 'wb') as f: _write_table(part_table, f) + assert fs.exists(file_path) else: _visit_level(level_dir, level + 1, this_part_keys) -- To stop receiving notification emails like this one, please contact ['"commits@arrow.apache.org" '].