arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-236: Bridging IO interfaces under the hood in pyarrow
Date Mon, 18 Jul 2016 22:37:41 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 55bfa8343 -> 59e5f9806


ARROW-236: Bridging IO interfaces under the hood in pyarrow

Author: Wes McKinney <wesm@apache.org>

Closes #104 from wesm/ARROW-236 and squashes the following commits:

73648e0 [Wes McKinney] cpplint
f2cd77f [Wes McKinney] Check in io.pxd
94bcd30 [Wes McKinney] Do not let Parquet close an Arrow file
9b9d94d [Wes McKinney] Barely working direct HDFS-Parquet reads
06ddd06 [Wes McKinney] Slight refactoring of read table to be able to also handle classes
wrapping C++ file interfaces
c7a913e [Wes McKinney] Provide a means to expose abstract native file handles
e6724de [Wes McKinney] Implement alternate ctor to construct parquet::FileReader from an arrow::io::RandomAccessFile


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/59e5f980
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/59e5f980
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/59e5f980

Branch: refs/heads/master
Commit: 59e5f9806515e8a5360870c93082316f74d7ec7c
Parents: 55bfa83
Author: Wes McKinney <wesm@apache.org>
Authored: Mon Jul 18 15:37:27 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Mon Jul 18 15:37:27 2016 -0700

----------------------------------------------------------------------
 cpp/src/arrow/io/interfaces.h            |  1 +
 cpp/src/arrow/parquet/io.cc              | 19 ++++++--
 cpp/src/arrow/parquet/io.h               | 10 ++--
 cpp/src/arrow/parquet/parquet-io-test.cc |  8 ++-
 cpp/src/arrow/parquet/reader.cc          | 20 ++++++++
 cpp/src/arrow/parquet/reader.h           | 13 +++--
 cpp/src/arrow/parquet/writer.cc          |  1 -
 cpp/src/arrow/parquet/writer.h           |  2 +-
 python/pyarrow/includes/libarrow_io.pxd  | 49 ++++++++++++-------
 python/pyarrow/includes/parquet.pxd      | 24 +++++++--
 python/pyarrow/io.pxd                    | 32 ++++++++++++
 python/pyarrow/io.pyx                    | 19 +++++++-
 python/pyarrow/parquet.pyx               | 70 +++++++++++++++++++++------
 13 files changed, 216 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 25361d5..c212852 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -19,6 +19,7 @@
 #define ARROW_IO_INTERFACES_H
 
 #include <cstdint>
+#include <memory>
 
 namespace arrow {
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/cpp/src/arrow/parquet/io.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/io.cc b/cpp/src/arrow/parquet/io.cc
index c81aa8c..b6fdd67 100644
--- a/cpp/src/arrow/parquet/io.cc
+++ b/cpp/src/arrow/parquet/io.cc
@@ -55,12 +55,23 @@ void ParquetAllocator::Free(uint8_t* buffer, int64_t size) {
 // ----------------------------------------------------------------------
 // ParquetReadSource
 
-ParquetReadSource::ParquetReadSource(
-    const std::shared_ptr<ArrowROFile>& file, ParquetAllocator* allocator)
-    : file_(file), allocator_(allocator) {}
+ParquetReadSource::ParquetReadSource(ParquetAllocator* allocator)
+    : file_(nullptr), allocator_(allocator) {}
+
+Status ParquetReadSource::Open(const std::shared_ptr<io::RandomAccessFile>& file)
{
+  int64_t file_size;
+  RETURN_NOT_OK(file->GetSize(&file_size));
+
+  file_ = file;
+  size_ = file_size;
+  return Status::OK();
+}
 
 void ParquetReadSource::Close() {
-  PARQUET_THROW_NOT_OK(file_->Close());
+  // TODO(wesm): Make this a no-op for now. This leaves Python wrappers for
+  // these classes in a borked state. Probably better to explicitly close.
+
+  // PARQUET_THROW_NOT_OK(file_->Close());
 }
 
 int64_t ParquetReadSource::Tell() const {

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/cpp/src/arrow/parquet/io.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/io.h b/cpp/src/arrow/parquet/io.h
index ef8871d..1c59695 100644
--- a/cpp/src/arrow/parquet/io.h
+++ b/cpp/src/arrow/parquet/io.h
@@ -49,7 +49,9 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator
{
   uint8_t* Malloc(int64_t size) override;
   void Free(uint8_t* buffer, int64_t size) override;
 
-  MemoryPool* pool() { return pool_; }
+  void set_pool(MemoryPool* pool) { pool_ = pool; }
+
+  MemoryPool* pool() const { return pool_; }
 
  private:
   MemoryPool* pool_;
@@ -57,8 +59,10 @@ class ARROW_EXPORT ParquetAllocator : public ::parquet::MemoryAllocator
{
 
 class ARROW_EXPORT ParquetReadSource : public ::parquet::RandomAccessSource {
  public:
-  ParquetReadSource(
-      const std::shared_ptr<io::RandomAccessFile>& file, ParquetAllocator* allocator);
+  explicit ParquetReadSource(ParquetAllocator* allocator);
+
+  // We need to ask for the file size on opening the file, and this can fail
+  Status Open(const std::shared_ptr<io::RandomAccessFile>& file);
 
   void Close() override;
   int64_t Tell() const override;

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
index 7e724b3..6615457 100644
--- a/cpp/src/arrow/parquet/parquet-io-test.cc
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -23,6 +23,7 @@
 #include "gtest/gtest.h"
 
 #include "arrow/parquet/io.h"
+#include "arrow/test-util.h"
 #include "arrow/util/memory-pool.h"
 #include "arrow/util/status.h"
 
@@ -147,9 +148,12 @@ TEST(TestParquetReadSource, Basics) {
   std::string data = "this is the data";
   auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
 
-  ParquetAllocator allocator;
+  ParquetAllocator allocator(default_memory_pool());
+
   auto file = std::make_shared<BufferReader>(data_buffer, data.size());
-  auto source = std::make_shared<ParquetReadSource>(file, &allocator);
+  auto source = std::make_shared<ParquetReadSource>(&allocator);
+
+  ASSERT_OK(source->Open(file));
 
   ASSERT_EQ(0, source->Tell());
   ASSERT_NO_THROW(source->Seek(5));

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/cpp/src/arrow/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc
index c7c400e..e92967e 100644
--- a/cpp/src/arrow/parquet/reader.cc
+++ b/cpp/src/arrow/parquet/reader.cc
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "arrow/column.h"
+#include "arrow/parquet/io.h"
 #include "arrow/parquet/schema.h"
 #include "arrow/parquet/utils.h"
 #include "arrow/schema.h"
@@ -35,6 +36,10 @@ using parquet::ColumnReader;
 using parquet::Repetition;
 using parquet::TypedColumnReader;
 
+// Help reduce verbosity
+using ParquetRAS = parquet::RandomAccessSource;
+using ParquetReader = parquet::ParquetFileReader;
+
 namespace arrow {
 namespace parquet {
 
@@ -181,6 +186,21 @@ FileReader::FileReader(
 
 FileReader::~FileReader() {}
 
+// Static ctor
+Status OpenFile(const std::shared_ptr<io::RandomAccessFile>& file,
+    ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader) {
+  std::unique_ptr<ParquetReadSource> source(new ParquetReadSource(allocator));
+  RETURN_NOT_OK(source->Open(file));
+
+  // TODO(wesm): reader properties
+  std::unique_ptr<ParquetReader> pq_reader;
+  PARQUET_CATCH_NOT_OK(pq_reader = ParquetReader::Open(std::move(source)));
+
+  // Use the same memory pool as the ParquetAllocator
+  reader->reset(new FileReader(allocator->pool(), std::move(pq_reader)));
+  return Status::OK();
+}
+
 Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
   return impl_->GetFlatColumn(i, out);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/cpp/src/arrow/parquet/reader.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.h b/cpp/src/arrow/parquet/reader.h
index 2c8a9df..f1492f6 100644
--- a/cpp/src/arrow/parquet/reader.h
+++ b/cpp/src/arrow/parquet/reader.h
@@ -23,6 +23,8 @@
 #include "parquet/api/reader.h"
 #include "parquet/api/schema.h"
 
+#include "arrow/io/interfaces.h"
+#include "arrow/parquet/io.h"
 #include "arrow/util/visibility.h"
 
 namespace arrow {
@@ -99,7 +101,7 @@ class ARROW_EXPORT FileReader {
   virtual ~FileReader();
 
  private:
-  class Impl;
+  class ARROW_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
 };
 
@@ -125,15 +127,20 @@ class ARROW_EXPORT FlatColumnReader {
   Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
 
  private:
-  class Impl;
+  class ARROW_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
   explicit FlatColumnReader(std::unique_ptr<Impl> impl);
 
   friend class FileReader;
 };
 
-}  // namespace parquet
+// Helper function to create a file reader from an implementation of an Arrow
+// readable file
+ARROW_EXPORT
+Status OpenFile(const std::shared_ptr<io::RandomAccessFile>& file,
+    ParquetAllocator* allocator, std::unique_ptr<FileReader>* reader);
 
+}  // namespace parquet
 }  // namespace arrow
 
 #endif  // ARROW_PARQUET_READER_H

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/cpp/src/arrow/parquet/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc
index 0139edd..f9514aa 100644
--- a/cpp/src/arrow/parquet/writer.cc
+++ b/cpp/src/arrow/parquet/writer.cc
@@ -35,7 +35,6 @@ using parquet::ParquetVersion;
 using parquet::schema::GroupNode;
 
 namespace arrow {
-
 namespace parquet {
 
 class FileWriter::Impl {

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/cpp/src/arrow/parquet/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h
index 45d0fd5..5aa1ba5 100644
--- a/cpp/src/arrow/parquet/writer.h
+++ b/cpp/src/arrow/parquet/writer.h
@@ -55,7 +55,7 @@ class ARROW_EXPORT FileWriter {
   MemoryPool* memory_pool() const;
 
  private:
-  class Impl;
+  class ARROW_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
 };
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/python/pyarrow/includes/libarrow_io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/libarrow_io.pxd b/python/pyarrow/includes/libarrow_io.pxd
index d0fb8f9..734ace6 100644
--- a/python/pyarrow/includes/libarrow_io.pxd
+++ b/python/pyarrow/includes/libarrow_io.pxd
@@ -19,11 +19,37 @@
 
 from pyarrow.includes.common cimport *
 
-cdef extern from "arrow/io/interfaces.h" nogil:
+cdef extern from "arrow/io/interfaces.h" namespace "arrow::io" nogil:
+    enum FileMode" arrow::io::FileMode::type":
+        FileMode_READ" arrow::io::FileMode::READ"
+        FileMode_WRITE" arrow::io::FileMode::WRITE"
+        FileMode_READWRITE" arrow::io::FileMode::READWRITE"
+
     enum ObjectType" arrow::io::ObjectType::type":
         ObjectType_FILE" arrow::io::ObjectType::FILE"
         ObjectType_DIRECTORY" arrow::io::ObjectType::DIRECTORY"
 
+    cdef cppclass FileBase:
+        CStatus Close()
+        CStatus Tell(int64_t* position)
+
+    cdef cppclass ReadableFile(FileBase):
+        CStatus GetSize(int64_t* size)
+        CStatus Read(int64_t nbytes, int64_t* bytes_read,
+                     uint8_t* buffer)
+
+        CStatus ReadAt(int64_t position, int64_t nbytes,
+                       int64_t* bytes_read, uint8_t* buffer)
+
+    cdef cppclass RandomAccessFile(ReadableFile):
+        CStatus Seek(int64_t position)
+
+    cdef cppclass WriteableFile(FileBase):
+        CStatus Write(const uint8_t* buffer, int64_t nbytes)
+        # CStatus Write(const uint8_t* buffer, int64_t nbytes,
+        #               int64_t* bytes_written)
+
+
 cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
     CStatus ConnectLibHdfs()
 
@@ -44,24 +70,11 @@ cdef extern from "arrow/io/hdfs.h" namespace "arrow::io" nogil:
         int64_t block_size
         int16_t permissions
 
-    cdef cppclass CHdfsFile:
-        CStatus Close()
-        CStatus Seek(int64_t position)
-        CStatus Tell(int64_t* position)
-
-    cdef cppclass HdfsReadableFile(CHdfsFile):
-        CStatus GetSize(int64_t* size)
-        CStatus Read(int64_t nbytes, int64_t* bytes_read,
-                     uint8_t* buffer)
-
-        CStatus ReadAt(int64_t position, int64_t nbytes,
-                       int64_t* bytes_read, uint8_t* buffer)
-
-    cdef cppclass HdfsWriteableFile(CHdfsFile):
-        CStatus Write(const uint8_t* buffer, int64_t nbytes)
+    cdef cppclass HdfsReadableFile(RandomAccessFile):
+        pass
 
-        CStatus Write(const uint8_t* buffer, int64_t nbytes,
-                      int64_t* bytes_written)
+    cdef cppclass HdfsWriteableFile(WriteableFile):
+        pass
 
     cdef cppclass CHdfsClient" arrow::io::HdfsClient":
         @staticmethod

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/python/pyarrow/includes/parquet.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/includes/parquet.pxd b/python/pyarrow/includes/parquet.pxd
index a2f83ea..fe24f59 100644
--- a/python/pyarrow/includes/parquet.pxd
+++ b/python/pyarrow/includes/parquet.pxd
@@ -19,6 +19,7 @@
 
 from pyarrow.includes.common cimport *
 from pyarrow.includes.libarrow cimport CSchema, CStatus, CTable, MemoryPool
+from pyarrow.includes.libarrow_io cimport RandomAccessFile
 
 
 cdef extern from "parquet/api/schema.h" namespace "parquet::schema" nogil:
@@ -90,19 +91,36 @@ cdef extern from "parquet/api/writer.h" namespace "parquet" nogil:
             shared_ptr[WriterProperties] build()
 
 
+cdef extern from "arrow/parquet/io.h" namespace "arrow::parquet" nogil:
+    cdef cppclass ParquetAllocator:
+        ParquetAllocator()
+        ParquetAllocator(MemoryPool* pool)
+        MemoryPool* pool()
+        void set_pool(MemoryPool* pool)
+
+    cdef cppclass ParquetReadSource:
+        ParquetReadSource(ParquetAllocator* allocator)
+        Open(const shared_ptr[RandomAccessFile]& file)
+
+
 cdef extern from "arrow/parquet/reader.h" namespace "arrow::parquet" nogil:
+    CStatus OpenFile(const shared_ptr[RandomAccessFile]& file,
+                     ParquetAllocator* allocator,
+                     unique_ptr[FileReader]* reader)
+
     cdef cppclass FileReader:
         FileReader(MemoryPool* pool, unique_ptr[ParquetFileReader] reader)
         CStatus ReadFlatTable(shared_ptr[CTable]* out);
 
 
 cdef extern from "arrow/parquet/schema.h" namespace "arrow::parquet" nogil:
-    CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema, shared_ptr[CSchema]*
out)
-    CStatus ToParquetSchema(const CSchema* arrow_schema, shared_ptr[SchemaDescriptor]* out)
+    CStatus FromParquetSchema(const SchemaDescriptor* parquet_schema,
+                              shared_ptr[CSchema]* out)
+    CStatus ToParquetSchema(const CSchema* arrow_schema,
+                            shared_ptr[SchemaDescriptor]* out)
 
 
 cdef extern from "arrow/parquet/writer.h" namespace "arrow::parquet" nogil:
     cdef CStatus WriteFlatTable(const CTable* table, MemoryPool* pool,
             const shared_ptr[OutputStream]& sink, int64_t chunk_size,
             const shared_ptr[WriterProperties]& properties)
-

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/python/pyarrow/io.pxd
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pxd b/python/pyarrow/io.pxd
new file mode 100644
index 0000000..b92af72
--- /dev/null
+++ b/python/pyarrow/io.pxd
@@ -0,0 +1,32 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# distutils: language = c++
+
+from pyarrow.includes.common cimport *
+from pyarrow.includes.libarrow cimport *
+from pyarrow.includes.libarrow_io cimport RandomAccessFile, WriteableFile
+
+
+cdef class NativeFileInterface:
+
+    # By implementing these "virtual" functions (all functions in Cython
+    # extension classes are technically virtual in the C++ sense)m we can
+    # expose the arrow::io abstract file interfaces to other components
+    # throughout the suite of Arrow C++ libraries
+    cdef read_handle(self, shared_ptr[RandomAccessFile]* file)
+    cdef write_handle(self, shared_ptr[WriteableFile]* file)

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/python/pyarrow/io.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/io.pyx b/python/pyarrow/io.pyx
index 071eea5..b8bf883 100644
--- a/python/pyarrow/io.pyx
+++ b/python/pyarrow/io.pyx
@@ -164,7 +164,7 @@ cdef class HdfsClient:
                           .ListDirectory(c_path, &listing))
 
         cdef const HdfsPathInfo* info
-        for i in range(listing.size()):
+        for i in range(<int> listing.size()):
             info = &listing[i]
 
             # Try to trim off the hdfs://HOST:PORT piece
@@ -314,8 +314,15 @@ cdef class HdfsClient:
         f = self.open(path, 'rb', buffer_size=buffer_size)
         f.download(stream)
 
+cdef class NativeFileInterface:
 
-cdef class HdfsFile:
+    cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
+        raise NotImplementedError
+
+    cdef write_handle(self, shared_ptr[WriteableFile]* file):
+        raise NotImplementedError
+
+cdef class HdfsFile(NativeFileInterface):
     cdef:
         shared_ptr[HdfsReadableFile] rd_file
         shared_ptr[HdfsWriteableFile] wr_file
@@ -357,6 +364,14 @@ cdef class HdfsFile:
         if self.is_readonly:
             raise IOError("only valid on writeonly files")
 
+    cdef read_handle(self, shared_ptr[RandomAccessFile]* file):
+        self._assert_readable()
+        file[0] = <shared_ptr[RandomAccessFile]> self.rd_file
+
+    cdef write_handle(self, shared_ptr[WriteableFile]* file):
+        self._assert_writeable()
+        file[0] = <shared_ptr[WriteableFile]> self.wr_file
+
     def size(self):
         cdef int64_t size
         self._assert_readable()

http://git-wip-us.apache.org/repos/asf/arrow/blob/59e5f980/python/pyarrow/parquet.pyx
----------------------------------------------------------------------
diff --git a/python/pyarrow/parquet.pyx b/python/pyarrow/parquet.pyx
index 0b2b208..ebba1a1 100644
--- a/python/pyarrow/parquet.pyx
+++ b/python/pyarrow/parquet.pyx
@@ -20,34 +20,75 @@
 # cython: embedsignature = True
 
 from pyarrow.includes.libarrow cimport *
-cimport pyarrow.includes.pyarrow as pyarrow
 from pyarrow.includes.parquet cimport *
+from pyarrow.includes.libarrow_io cimport RandomAccessFile, WriteableFile
+cimport pyarrow.includes.pyarrow as pyarrow
 
 from pyarrow.compat import tobytes
 from pyarrow.error import ArrowException
 from pyarrow.error cimport check_cstatus
+from pyarrow.io import NativeFileInterface
 from pyarrow.table cimport Table
 
-def read_table(filename, columns=None):
+from pyarrow.io cimport NativeFileInterface
+
+import six
+
+
+cdef class ParquetReader:
+    cdef:
+        ParquetAllocator allocator
+        unique_ptr[FileReader] reader
+
+    def __cinit__(self):
+        self.allocator.set_pool(default_memory_pool())
+
+    cdef open_local_file(self, file_path):
+        cdef c_string path = tobytes(file_path)
+
+        # Must be in one expression to avoid calling std::move which is not
+        # possible in Cython (due to missing rvalue support)
+
+        # TODO(wesm): ParquetFileReader::OpenFIle can throw?
+        self.reader = unique_ptr[FileReader](
+            new FileReader(default_memory_pool(),
+                           ParquetFileReader.OpenFile(path)))
+
+    cdef open_native_file(self, NativeFileInterface file):
+        cdef shared_ptr[RandomAccessFile] cpp_handle
+        file.read_handle(&cpp_handle)
+
+        check_cstatus(OpenFile(cpp_handle, &self.allocator, &self.reader))
+
+    def read_all(self):
+        cdef:
+            Table table = Table()
+            shared_ptr[CTable] ctable
+
+        with nogil:
+            check_cstatus(self.reader.get()
+                          .ReadFlatTable(&ctable))
+
+        table.init(ctable)
+        return table
+
+
+def read_table(source, columns=None):
     """
     Read a Table from Parquet format
     Returns
     -------
     table: pyarrow.Table
     """
-    cdef unique_ptr[FileReader] reader
-    cdef Table table = Table()
-    cdef shared_ptr[CTable] ctable
-
-    # Must be in one expression to avoid calling std::move which is not possible
-    # in Cython (due to missing rvalue support)
-    reader = unique_ptr[FileReader](new FileReader(default_memory_pool(),
-        ParquetFileReader.OpenFile(tobytes(filename))))
-    with nogil:
-        check_cstatus(reader.get().ReadFlatTable(&ctable))
+    cdef ParquetReader reader = ParquetReader()
+
+    if isinstance(source, six.string_types):
+        reader.open_local_file(source)
+    elif isinstance(source, NativeFileInterface):
+        reader.open_native_file(source)
+
+    return reader.read_all()
 
-    table.init(ctable)
-    return table
 
 def write_table(table, filename, chunk_size=None, version=None):
     """
@@ -84,4 +125,3 @@ def write_table(table, filename, chunk_size=None, version=None):
     with nogil:
         check_cstatus(WriteFlatTable(ctable_, default_memory_pool(), sink,
             chunk_size_, properties_builder.build()))
-


Mime
View raw message