arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1213: [Python] Support s3fs filesystem for Amazon S3 in ParquetDataset
Date Mon, 31 Jul 2017 22:46:58 GMT
Repository: arrow
Updated Branches:
  refs/heads/master b4eec6203 -> af2aeafca


ARROW-1213: [Python] Support s3fs filesystem for Amazon S3 in ParquetDataset

cc @yackoa

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #916 from wesm/ARROW-1213 and squashes the following commits:

f8a0aff1 [Wes McKinney] Add HDFS section to API docs
c54302df [Wes McKinney] Add deprecation warning for HdfsClient
4d3e7222 [Wes McKinney] Auto-wrap s3fs filesystem when using ParquetDataset
0be33bb8 [Wes McKinney] Implement os.walk emulation layer for s3fs
719f806d [Wes McKinney] Progress toward supporting s3fs in Parquet reader
bbd664ed [Wes McKinney] Refactor HdfsClient into pyarrow/hdfs.py. Add connect factory method.
Rename to HadoopFilesystem. Add walk implementation for HDFS, base Parquet directory walker
on that
4984a9d4 [Wes McKinney] Refactoring slightly
4c0bcf4a [Wes McKinney] Start on Dask filesystem wrapper, S3-Parquet dataset test case


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/af2aeafc
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/af2aeafc
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/af2aeafc

Branch: refs/heads/master
Commit: af2aeafca6b5e7f460fa157d29b571b1ef3c2053
Parents: b4eec62
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Mon Jul 31 18:46:54 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Mon Jul 31 18:46:54 2017 -0400

----------------------------------------------------------------------
 python/.gitignore                    |   6 ++
 python/doc/source/api.rst            |  10 +++
 python/doc/source/filesystems.rst    |  15 ++--
 python/pyarrow/__init__.py           |  20 ++---
 python/pyarrow/filesystem.py         | 140 ++++++++++++++++++++----------
 python/pyarrow/hdfs.py               | 133 ++++++++++++++++++++++++++++
 python/pyarrow/parquet.py            |  67 ++++++++------
 python/pyarrow/tests/conftest.py     |  13 ++-
 python/pyarrow/tests/test_hdfs.py    |   2 +-
 python/pyarrow/tests/test_parquet.py |  50 +++++++++--
 python/pyarrow/util.py               |  12 +++
 11 files changed, 368 insertions(+), 100 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/.gitignore
----------------------------------------------------------------------
diff --git a/python/.gitignore b/python/.gitignore
index 6c0d5a9..1bf20c4 100644
--- a/python/.gitignore
+++ b/python/.gitignore
@@ -34,3 +34,9 @@ coverage.xml
 # benchmark working dir
 .asv
 pyarrow/_table_api.h
+
+# manylinux1 temporary files
+manylinux1/arrow
+
+# plasma store
+pyarrow/plasma_store
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/doc/source/api.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/api.rst b/python/doc/source/api.rst
index b84163b..6f26076 100644
--- a/python/doc/source/api.rst
+++ b/python/doc/source/api.rst
@@ -164,6 +164,16 @@ Input / Output and Shared Memory
    create_memory_map
    PythonFile
 
+Filesystems
+-----------
+
+.. autosummary::
+   :toctree: generated/
+
+   hdfs.connect
+   HadoopFilesystem
+   LocalFilesystem
+
 .. _api.ipc:
 
 Interprocess Communication and Messaging

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/doc/source/filesystems.rst
----------------------------------------------------------------------
diff --git a/python/doc/source/filesystems.rst b/python/doc/source/filesystems.rst
index 61c03c5..78f6f2a 100644
--- a/python/doc/source/filesystems.rst
+++ b/python/doc/source/filesystems.rst
@@ -31,12 +31,13 @@ System. You connect like so:
 .. code-block:: python
 
    import pyarrow as pa
-   hdfs = pa.HdfsClient(host, port, user=user, kerb_ticket=ticket_cache_path)
+   hdfs = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path)
+   type(hdfs)
 
-By default, ``pyarrow.HdfsClient`` uses libhdfs, a JNI-based interface to the
-Java Hadoop client. This library is loaded **at runtime** (rather than at link
-/ library load time, since the library may not be in your LD_LIBRARY_PATH), and
-relies on some environment variables.
+By default, ``pyarrow.hdfs.HadoopFilesystem`` uses libhdfs, a JNI-based
+interface to the Java Hadoop client. This library is loaded **at runtime**
+(rather than at link / library load time, since the library may not be in your
+LD_LIBRARY_PATH), and relies on some environment variables.
 
 * ``HADOOP_HOME``: the root of your installed Hadoop distribution. Often has
   `lib/native/libhdfs.so`.
@@ -56,5 +57,5 @@ You can also use libhdfs3, a thirdparty C++ library for HDFS from Pivotal
Labs:
 
 .. code-block:: python
 
-   hdfs3 = pa.HdfsClient(host, port, user=user, kerb_ticket=ticket_cache_path,
-                         driver='libhdfs3')
+   hdfs3 = pa.hdfs.connect(host, port, user=user, kerb_ticket=ticket_cache_path,
+                           driver='libhdfs3')

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/pyarrow/__init__.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/__init__.py b/python/pyarrow/__init__.py
index 6d0ce20..42e5803 100644
--- a/python/pyarrow/__init__.py
+++ b/python/pyarrow/__init__.py
@@ -88,7 +88,10 @@ from pyarrow.lib import (ArrowException,
                          ArrowTypeError)
 
 
-from pyarrow.filesystem import Filesystem, HdfsClient, LocalFilesystem
+from pyarrow.filesystem import Filesystem, LocalFilesystem
+
+from pyarrow.hdfs import HadoopFilesystem
+import pyarrow.hdfs as hdfs
 
 from pyarrow.ipc import (Message, MessageReader,
                          RecordBatchFileReader, RecordBatchFileWriter,
@@ -106,16 +109,7 @@ localfs = LocalFilesystem.get_instance()
 # ----------------------------------------------------------------------
 # 0.4.0 deprecations
 
-import warnings
-
-def _deprecate_class(old_name, new_name, klass, next_version='0.5.0'):
-    msg = ('pyarrow.{0} has been renamed to '
-           '{1}, will be removed in {2}'
-           .format(old_name, new_name, next_version))
-    def deprecated_factory(*args, **kwargs):
-        warnings.warn(msg, FutureWarning)
-        return klass(*args)
-    return deprecated_factory
+from pyarrow.util import _deprecate_class
 
 FileReader = _deprecate_class('FileReader',
                               'RecordBatchFileReader',
@@ -136,3 +130,7 @@ StreamWriter = _deprecate_class('StreamWriter',
 InMemoryOutputStream = _deprecate_class('InMemoryOutputStream',
                                         'BufferOutputStream',
                                         BufferOutputStream, '0.5.0')
+
+# Backwards compatibility with pyarrow < 0.6.0
+HdfsClient = _deprecate_class('HdfsClient', 'pyarrow.hdfs.connect',
+                              hdfs.connect, '0.6.0')

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/pyarrow/filesystem.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/filesystem.py b/python/pyarrow/filesystem.py
index 9fa4f76..4b8ca32 100644
--- a/python/pyarrow/filesystem.py
+++ b/python/pyarrow/filesystem.py
@@ -17,9 +17,9 @@
 
 from os.path import join as pjoin
 import os
+import posixpath
 
 from pyarrow.util import implements
-import pyarrow.lib as lib
 
 
 class Filesystem(object):
@@ -44,6 +44,12 @@ class Filesystem(object):
         """
         raise NotImplementedError
 
+    def rm(self, path, recursive=False):
+        """
+        Alias for Filesystem.delete
+        """
+        return self.delete(path, recursive=recursive)
+
     def mkdir(self, path, create_parents=True):
         raise NotImplementedError
 
@@ -96,6 +102,12 @@ class Filesystem(object):
         return dataset.read(columns=columns, nthreads=nthreads,
                             use_pandas_metadata=use_pandas_metadata)
 
+    def open(self, path, mode='rb'):
+        """
+        Open file for reading or writing
+        """
+        raise NotImplementedError
+
     @property
     def pathsep(self):
         return '/'
@@ -134,6 +146,7 @@ class LocalFilesystem(Filesystem):
     def exists(self, path):
         return os.path.exists(path)
 
+    @implements(Filesystem.open)
     def open(self, path, mode='rb'):
         """
         Open file for reading or writing
@@ -144,68 +157,103 @@ class LocalFilesystem(Filesystem):
     def pathsep(self):
         return os.path.sep
 
+    def walk(self, top_dir):
+        """
+        Directory tree generator, see os.walk
+        """
+        return os.walk(top_dir)
 
-class HdfsClient(lib._HdfsClient, Filesystem):
+
+class DaskFilesystem(Filesystem):
     """
-    Connect to an HDFS cluster. All parameters are optional and should
-    only be set if the defaults need to be overridden.
-
-    Authentication should be automatic if the HDFS cluster uses Kerberos.
-    However, if a username is specified, then the ticket cache will likely
-    be required.
-
-    Parameters
-    ----------
-    host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
-    port : NameNode's port. Set to 0 for default or logical (HA) nodes.
-    user : Username when connecting to HDFS; None implies login user.
-    kerb_ticket : Path to Kerberos ticket cache.
-    driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs'
-      Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++
-      library from Pivotal Labs)
-
-    Notes
-    -----
-    The first time you call this method, it will take longer than usual due
-    to JNI spin-up time.
-
-    Returns
-    -------
-    client : HDFSClient
+    Wraps s3fs Dask filesystem implementation like s3fs, gcsfs, etc.
     """
 
-    def __init__(self, host="default", port=0, user=None, kerb_ticket=None,
-                 driver='libhdfs'):
-        self._connect(host, port, user, kerb_ticket, driver)
+    def __init__(self, fs):
+        self.fs = fs
 
     @implements(Filesystem.isdir)
     def isdir(self, path):
-        return lib._HdfsClient.isdir(self, path)
+        raise NotImplementedError("Unsupported file system API")
 
     @implements(Filesystem.isfile)
     def isfile(self, path):
-        return lib._HdfsClient.isfile(self, path)
+        raise NotImplementedError("Unsupported file system API")
 
     @implements(Filesystem.delete)
     def delete(self, path, recursive=False):
-        return lib._HdfsClient.delete(self, path, recursive)
+        return self.fs.rm(path, recursive=recursive)
 
     @implements(Filesystem.mkdir)
-    def mkdir(self, path, create_parents=True):
-        return lib._HdfsClient.mkdir(self, path)
+    def mkdir(self, path):
+        return self.fs.mkdir(path)
 
-    def ls(self, path, full_info=False):
+    @implements(Filesystem.open)
+    def open(self, path, mode='rb'):
+        """
+        Open file for reading or writing
         """
-        Retrieve directory contents and metadata, if requested.
+        return self.fs.open(path, mode=mode)
 
-        Parameters
-        ----------
-        path : HDFS path
-        full_info : boolean, default False
-            If False, only return list of paths
+    def ls(self, path, detail=False):
+        return self.fs.ls(path, detail=detail)
 
-        Returns
-        -------
-        result : list of dicts (full_info=True) or strings (full_info=False)
+    def walk(self, top_path):
+        """
+        Directory tree generator, like os.walk
+        """
+        return self.fs.walk(top_path)
+
+
+class S3FSWrapper(DaskFilesystem):
+
+    @implements(Filesystem.isdir)
+    def isdir(self, path):
+        try:
+            contents = self.fs.ls(path)
+            if len(contents) == 1 and contents[0] == path:
+                return False
+            else:
+                return True
+        except OSError:
+            return False
+
+    @implements(Filesystem.isfile)
+    def isfile(self, path):
+        try:
+            contents = self.fs.ls(path)
+            return len(contents) == 1 and contents[0] == path
+        except OSError:
+            return False
+
+    def walk(self, path, refresh=False):
+        """
+        Directory tree generator, like os.walk
+
+        Generator version of what is in s3fs, which yields a flattened list of
+        files
         """
-        return lib._HdfsClient.ls(self, path, full_info)
+        path = path.replace('s3://', '')
+        directories = set()
+        files = set()
+
+        for key in list(self.fs._ls(path, refresh=refresh)):
+            path = key['Key']
+            if key['StorageClass'] == 'DIRECTORY':
+                directories.add(path)
+            elif key['StorageClass'] == 'BUCKET':
+                pass
+            else:
+                files.add(path)
+
+        # s3fs creates duplicate 'DIRECTORY' entries
+        files = sorted([posixpath.split(f)[1] for f in files
+                        if f not in directories])
+        directories = sorted([posixpath.split(x)[1]
+                              for x in directories])
+
+        yield path, directories, files
+
+        for directory in directories:
+            for tup in self.walk(directory, refresh=refresh):
+                yield tup

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/pyarrow/hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/hdfs.py b/python/pyarrow/hdfs.py
new file mode 100644
index 0000000..3240f99
--- /dev/null
+++ b/python/pyarrow/hdfs.py
@@ -0,0 +1,133 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+import posixpath
+
+from pyarrow.util import implements
+from pyarrow.filesystem import Filesystem
+import pyarrow.lib as lib
+
+
+class HadoopFilesystem(lib._HdfsClient, Filesystem):
+    """
+    Filesystem interface for HDFS cluster. See pyarrow.hdfs.connect for full
+    connection details
+    """
+
+    def __init__(self, host="default", port=0, user=None, kerb_ticket=None,
+                 driver='libhdfs'):
+        self._connect(host, port, user, kerb_ticket, driver)
+
+    @implements(Filesystem.isdir)
+    def isdir(self, path):
+        return lib._HdfsClient.isdir(self, path)
+
+    @implements(Filesystem.isfile)
+    def isfile(self, path):
+        return lib._HdfsClient.isfile(self, path)
+
+    @implements(Filesystem.delete)
+    def delete(self, path, recursive=False):
+        return lib._HdfsClient.delete(self, path, recursive)
+
+    @implements(Filesystem.mkdir)
+    def mkdir(self, path, create_parents=True):
+        return lib._HdfsClient.mkdir(self, path)
+
+    def ls(self, path, detail=False):
+        """
+        Retrieve directory contents and metadata, if requested.
+
+        Parameters
+        ----------
+        path : HDFS path
+        detail : boolean, default False
+            If False, only return list of paths
+
+        Returns
+        -------
+        result : list of dicts (detail=True) or strings (detail=False)
+        """
+        return lib._HdfsClient.ls(self, path, detail)
+
+    def walk(self, top_path):
+        """
+        Directory tree generator for HDFS, like os.walk
+
+        Parameters
+        ----------
+        top_path : string
+            Root directory for tree traversal
+
+        Returns
+        -------
+        Generator yielding 3-tuple (dirpath, dirnames, filename)
+        """
+        contents = self.ls(top_path, detail=True)
+
+        directories, files = _libhdfs_walk_files_dirs(top_path, contents)
+        yield top_path, directories, files
+        for dirname in directories:
+            for tup in self.walk(dirname):
+                yield tup
+
+
+def _libhdfs_walk_files_dirs(top_path, contents):
+    files = []
+    directories = []
+    for c in contents:
+        scrubbed_name = posixpath.split(c['name'])[1]
+        if c['kind'] == 'file':
+            files.append(scrubbed_name)
+        else:
+            directories.append(scrubbed_name)
+
+    return directories, files
+
+
+def connect(host="default", port=0, user=None, kerb_ticket=None,
+            driver='libhdfs'):
+    """
+    Connect to an HDFS cluster. All parameters are optional and should
+    only be set if the defaults need to be overridden.
+
+    Authentication should be automatic if the HDFS cluster uses Kerberos.
+    However, if a username is specified, then the ticket cache will likely
+    be required.
+
+    Parameters
+    ----------
+    host : NameNode. Set to "default" for fs.defaultFS from core-site.xml.
+    port : NameNode's port. Set to 0 for default or logical (HA) nodes.
+    user : Username when connecting to HDFS; None implies login user.
+    kerb_ticket : Path to Kerberos ticket cache.
+    driver : {'libhdfs', 'libhdfs3'}, default 'libhdfs'
+      Connect using libhdfs (JNI-based) or libhdfs3 (3rd-party C++
+      library from Apache HAWQ (incubating) )
+
+    Notes
+    -----
+    The first time you call this method, it will take longer than usual due
+    to JNI spin-up time.
+
+    Returns
+    -------
+    filesystem : HadoopFilesystem
+    """
+    fs = HadoopFilesystem(host=host, port=port, user=user,
+                          kerb_ticket=kerb_ticket, driver=driver)
+    return fs

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/pyarrow/parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.py b/python/pyarrow/parquet.py
index a3af9ae..c870412 100644
--- a/python/pyarrow/parquet.py
+++ b/python/pyarrow/parquet.py
@@ -22,7 +22,7 @@ import six
 
 import numpy as np
 
-from pyarrow.filesystem import LocalFilesystem
+from pyarrow.filesystem import Filesystem, LocalFilesystem
 from pyarrow._parquet import (ParquetReader, FileMetaData,  # noqa
                               RowGroupMetaData, ParquetSchema,
                               ParquetWriter)
@@ -416,40 +416,41 @@ class ParquetManifest(object):
         self._visit_level(0, self.dirpath, [])
 
     def _visit_level(self, level, base_path, part_keys):
-        directories = []
-        files = []
         fs = self.filesystem
 
-        if not fs.isdir(base_path):
-            raise ValueError('"{0}" is not a directory'.format(base_path))
-
-        for path in sorted(fs.ls(base_path)):
-            if fs.isfile(path):
-                if _is_parquet_file(path):
-                    files.append(path)
-                elif path.endswith('_common_metadata'):
-                    self.common_metadata_path = path
-                elif path.endswith('_metadata'):
-                    self.metadata_path = path
-                elif not self._should_silently_exclude(path):
-                    print('Ignoring path: {0}'.format(path))
-            elif fs.isdir(path):
-                directories.append(path)
+        _, directories, files = next(fs.walk(base_path))
+
+        filtered_files = []
+        for path in files:
+            full_path = self.pathsep.join((base_path, path))
+            if _is_parquet_file(path):
+                filtered_files.append(full_path)
+            elif path.endswith('_common_metadata'):
+                self.common_metadata_path = full_path
+            elif path.endswith('_metadata'):
+                self.metadata_path = full_path
+            elif not self._should_silently_exclude(path):
+                print('Ignoring path: {0}'.format(full_path))
 
         # ARROW-1079: Filter out "private" directories starting with underscore
-        directories = [x for x in directories if not _is_private_directory(x)]
+        filtered_directories = [self.pathsep.join((base_path, x))
+                                for x in directories
+                                if not _is_private_directory(x)]
+
+        filtered_files.sort()
+        filtered_directories.sort()
 
-        if len(files) > 0 and len(directories) > 0:
+        if len(files) > 0 and len(filtered_directories) > 0:
             raise ValueError('Found files in an intermediate '
                              'directory: {0}'.format(base_path))
-        elif len(directories) > 0:
-            self._visit_directories(level, directories, part_keys)
+        elif len(filtered_directories) > 0:
+            self._visit_directories(level, filtered_directories, part_keys)
         else:
-            self._push_pieces(files, part_keys)
+            self._push_pieces(filtered_files, part_keys)
 
-    def _should_silently_exclude(self, path):
-        _, tail = path.rsplit(self.pathsep, 1)
-        return tail.endswith('.crc') or tail in EXCLUDED_PARQUET_PATHS
+    def _should_silently_exclude(self, file_name):
+        return (file_name.endswith('.crc') or
+                file_name in EXCLUDED_PARQUET_PATHS)
 
     def _visit_directories(self, level, directories, part_keys):
         for path in directories:
@@ -523,7 +524,7 @@ class ParquetDataset(object):
         if filesystem is None:
             self.fs = LocalFilesystem.get_instance()
         else:
-            self.fs = filesystem
+            self.fs = _ensure_filesystem(filesystem)
 
         self.paths = path_or_paths
 
@@ -642,6 +643,18 @@ class ParquetDataset(object):
         return open_file
 
 
+def _ensure_filesystem(fs):
+    if not isinstance(fs, Filesystem):
+        if type(fs).__name__ == 'S3FileSystem':
+            from pyarrow.filesystem import S3FSWrapper
+            return S3FSWrapper(fs)
+        else:
+            raise IOError('Unrecognized filesystem: {0}'
+                          .format(type(fs)))
+    else:
+        return fs
+
+
 def _make_manifest(path_or_paths, fs, pathsep='/'):
     partitions = None
     metadata_path = None

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/pyarrow/tests/conftest.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/conftest.py b/python/pyarrow/tests/conftest.py
index 651438b..c6bd6c9 100644
--- a/python/pyarrow/tests/conftest.py
+++ b/python/pyarrow/tests/conftest.py
@@ -18,13 +18,22 @@
 from pytest import skip
 
 
-groups = ['hdfs', 'parquet', 'plasma', 'large_memory']
+groups = [
+    'hdfs',
+    'parquet',
+    'plasma',
+    'large_memory',
+    's3',
+]
+
 
 defaults = {
     'hdfs': False,
+    'large_memory': False,
     'parquet': False,
     'plasma': False,
-    'large_memory': False
+    'large_memory': False,
+    's3': False,
 }
 
 try:

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/pyarrow/tests/test_hdfs.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_hdfs.py b/python/pyarrow/tests/test_hdfs.py
index cea02fb..1026408 100644
--- a/python/pyarrow/tests/test_hdfs.py
+++ b/python/pyarrow/tests/test_hdfs.py
@@ -43,7 +43,7 @@ def hdfs_test_client(driver='libhdfs'):
         raise ValueError('Env variable ARROW_HDFS_TEST_PORT was not '
                          'an integer')
 
-    return pa.HdfsClient(host, port, user, driver=driver)
+    return pa.hdfs.connect(host, port, user, driver=driver)
 
 
 @pytest.mark.hdfs

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/pyarrow/tests/test_parquet.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/tests/test_parquet.py b/python/pyarrow/tests/test_parquet.py
index f840673..06265ca 100644
--- a/python/pyarrow/tests/test_parquet.py
+++ b/python/pyarrow/tests/test_parquet.py
@@ -700,6 +700,45 @@ def test_partition_set_dictionary_type():
 
 @parquet
 def test_read_partitioned_directory(tmpdir):
+    fs = LocalFilesystem.get_instance()
+    base_path = str(tmpdir)
+
+    _partition_test_for_filesystem(fs, base_path)
+
+
+@pytest.yield_fixture
+def s3_example():
+    access_key = os.environ['PYARROW_TEST_S3_ACCESS_KEY']
+    secret_key = os.environ['PYARROW_TEST_S3_SECRET_KEY']
+    bucket_name = os.environ['PYARROW_TEST_S3_BUCKET']
+
+    import s3fs
+    fs = s3fs.S3FileSystem(key=access_key, secret=secret_key)
+
+    test_dir = guid()
+
+    bucket_uri = 's3://{0}/{1}'.format(bucket_name, test_dir)
+    fs.mkdir(bucket_uri)
+    yield fs, bucket_uri
+    fs.rm(bucket_uri, recursive=True)
+
+
+@pytest.mark.s3
+@parquet
+def test_read_partitioned_directory_s3fs(s3_example):
+    from pyarrow.filesystem import S3FSWrapper
+    import pyarrow.parquet as pq
+
+    fs, bucket_uri = s3_example
+    wrapper = S3FSWrapper(fs)
+    _partition_test_for_filesystem(wrapper, bucket_uri)
+
+    # Check that we can auto-wrap
+    dataset = pq.ParquetDataset(bucket_uri, filesystem=fs)
+    dataset.read()
+
+
+def _partition_test_for_filesystem(fs, base_path):
     import pyarrow.parquet as pq
 
     foo_keys = [0, 1]
@@ -717,10 +756,9 @@ def test_read_partitioned_directory(tmpdir):
         'values': np.random.randn(N)
     }, columns=['index', 'foo', 'bar', 'values'])
 
-    base_path = str(tmpdir)
-    _generate_partition_directories(base_path, partition_spec, df)
+    _generate_partition_directories(fs, base_path, partition_spec, df)
 
-    dataset = pq.ParquetDataset(base_path)
+    dataset = pq.ParquetDataset(base_path, filesystem=fs)
     table = dataset.read()
     result_df = (table.to_pandas()
                  .sort_values(by='index')
@@ -737,12 +775,11 @@ def test_read_partitioned_directory(tmpdir):
     tm.assert_frame_equal(result_df, expected_df)
 
 
-def _generate_partition_directories(base_dir, partition_spec, df):
+def _generate_partition_directories(fs, base_dir, partition_spec, df):
     # partition_spec : list of lists, e.g. [['foo', [0, 1, 2],
     #                                       ['bar', ['a', 'b', 'c']]
     # part_table : a pyarrow.Table to write to each partition
     DEPTH = len(partition_spec)
-    fs = LocalFilesystem.get_instance()
 
     def _visit_level(base_dir, level, part_keys):
         name, values = partition_spec[level]
@@ -758,7 +795,8 @@ def _generate_partition_directories(base_dir, partition_spec, df):
 
                 filtered_df = _filter_partition(df, this_part_keys)
                 part_table = pa.Table.from_pandas(filtered_df)
-                _write_table(part_table, file_path)
+                with fs.open(file_path, 'wb') as f:
+                    _write_table(part_table, f)
             else:
                 _visit_level(level_dir, level + 1, this_part_keys)
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/af2aeafc/python/pyarrow/util.py
----------------------------------------------------------------------
diff --git a/python/pyarrow/util.py b/python/pyarrow/util.py
index 4b6a835..d984e19 100644
--- a/python/pyarrow/util.py
+++ b/python/pyarrow/util.py
@@ -15,6 +15,8 @@
 # specific language governing permissions and limitations
 # under the License.
 
+import warnings
+
 # Miscellaneous utility code
 
 
@@ -23,3 +25,13 @@ def implements(f):
         g.__doc__ = f.__doc__
         return g
     return decorator
+
+
+def _deprecate_class(old_name, new_name, klass, next_version='0.5.0'):
+    msg = ('pyarrow.{0} is deprecated as of {1}, please use {2} instead'
+           .format(old_name, next_version, new_name))
+
+    def deprecated_factory(*args, **kwargs):
+        warnings.warn(msg, FutureWarning)
+        return klass(*args)
+    return deprecated_factory


Mime
View raw message