arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-1641: [C++] Hide std::mutex from public headers
Date Sat, 07 Oct 2017 19:48:29 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 8309556c7 -> b29b06586


ARROW-1641: [C++] Hide std::mutex from public headers

This was one part of ARROW-1134 that we can push through. I had to do some refactoring since
there was a mutex in one of the base file interfaces. It doesn't appear that this will impact
parquet-cpp or other Arrow users

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #1165 from wesm/ARROW-1641 and squashes the following commits:

9f4a466d [Wes McKinney] Make PyReadableFile C++ threadsafe also
002c5a62 [Wes McKinney] Make ReadableFile::Read threadsafe again
e373ff2a [Wes McKinney] Rewrite CudaBufferWriter to not inherit from FixedSizeBufferWriter.
Fix deadlocks in ReadableFile
6ab45e8f [Wes McKinney] Hide mutexes from public headers


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b29b0658
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b29b0658
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b29b0658

Branch: refs/heads/master
Commit: b29b06586ff89bee93062ad082a930d30d5f45df
Parents: 8309556
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Sat Oct 7 15:48:18 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sat Oct 7 15:48:18 2017 -0400

----------------------------------------------------------------------
 cpp/src/arrow/allocator-test.cc   |  15 ++-
 cpp/src/arrow/array.h             |   2 +-
 cpp/src/arrow/gpu/cuda_memory.cc  | 161 +++++++++++++++++++++++----------
 cpp/src/arrow/gpu/cuda_memory.h   |  21 ++---
 cpp/src/arrow/io/file.cc          |  43 ++++++++-
 cpp/src/arrow/io/file.h           |  13 +++
 cpp/src/arrow/io/interfaces.cc    |   2 -
 cpp/src/arrow/io/interfaces.h     |   9 +-
 cpp/src/arrow/io/memory.cc        | 133 +++++++++++++++++++--------
 cpp/src/arrow/io/memory.h         |  18 ++--
 cpp/src/arrow/memory_pool-test.cc |  29 +++---
 cpp/src/arrow/memory_pool.cc      | 100 ++++++++++----------
 cpp/src/arrow/memory_pool.h       |  21 -----
 cpp/src/arrow/python/io.cc        | 128 +++++++++++++++-----------
 cpp/src/arrow/python/io.h         |  24 ++---
 15 files changed, 442 insertions(+), 277 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/allocator-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/allocator-test.cc b/cpp/src/arrow/allocator-test.cc
index f3af47c..7c768be 100644
--- a/cpp/src/arrow/allocator-test.cc
+++ b/cpp/src/arrow/allocator-test.cc
@@ -59,17 +59,16 @@ TEST(stl_allocator, FreeLargeMemory) {
 }
 
 TEST(stl_allocator, MaxMemory) {
-  DefaultMemoryPool pool;
+  auto pool = default_memory_pool();
 
-  ASSERT_EQ(0, pool.max_memory());
-  stl_allocator<uint8_t> alloc(&pool);
-  uint8_t* data = alloc.allocate(100);
-  uint8_t* data2 = alloc.allocate(100);
+  stl_allocator<uint8_t> alloc(pool);
+  uint8_t* data = alloc.allocate(1000);
+  uint8_t* data2 = alloc.allocate(1000);
 
-  alloc.deallocate(data, 100);
-  alloc.deallocate(data2, 100);
+  alloc.deallocate(data, 1000);
+  alloc.deallocate(data2, 1000);
 
-  ASSERT_EQ(200, pool.max_memory());
+  ASSERT_EQ(2000, pool->max_memory());
 }
 
 #endif  // ARROW_VALGRIND

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/array.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h
index 975cd5b..36bceeb 100644
--- a/cpp/src/arrow/array.h
+++ b/cpp/src/arrow/array.h
@@ -179,7 +179,7 @@ class ARROW_EXPORT Array {
   /// boundscheck
   bool IsValid(int64_t i) const {
     return null_bitmap_data_ != nullptr &&
-      BitUtil::GetBit(null_bitmap_data_, i + data_->offset);
+           BitUtil::GetBit(null_bitmap_data_, i + data_->offset);
   }
 
   /// Size in the number of elements this array contains.

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/gpu/cuda_memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc
index d1026ca..949c1d7 100644
--- a/cpp/src/arrow/gpu/cuda_memory.cc
+++ b/cpp/src/arrow/gpu/cuda_memory.cc
@@ -21,6 +21,7 @@
 #include <cstdint>
 #include <cstdlib>
 #include <memory>
+#include <mutex>
 
 #include <cuda.h>
 
@@ -151,71 +152,139 @@ Status CudaBufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>*
out) {
 // ----------------------------------------------------------------------
 // CudaBufferWriter
 
-CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer)
-    : io::FixedSizeBufferWriter(buffer),
-      context_(buffer->context()),
-      buffer_size_(0),
-      buffer_position_(0) {}
-
-CudaBufferWriter::~CudaBufferWriter() {}
-
-Status CudaBufferWriter::Close() { return Flush(); }
+class CudaBufferWriter::CudaBufferWriterImpl {
+ public:
+  explicit CudaBufferWriterImpl(const std::shared_ptr<CudaBuffer>& buffer)
+      : context_(buffer->context()),
+        buffer_(buffer),
+        buffer_size_(0),
+        buffer_position_(0) {
+    buffer_ = buffer;
+    DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
+    mutable_data_ = buffer->mutable_data();
+    size_ = buffer->size();
+    position_ = 0;
+  }
 
-Status CudaBufferWriter::Flush() {
-  if (buffer_size_ > 0 && buffer_position_ > 0) {
-    // Only need to flush when the write has been buffered
-    RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_ - buffer_position_,
-                                             host_buffer_data_, buffer_position_));
-    buffer_position_ = 0;
+  Status Seek(int64_t position) {
+    if (position < 0 || position >= size_) {
+      return Status::IOError("position out of bounds");
+    }
+    position_ = position;
+    return Status::OK();
   }
-  return Status::OK();
-}
 
-Status CudaBufferWriter::Seek(int64_t position) {
-  if (buffer_position_ > 0) {
-    RETURN_NOT_OK(Flush());
+  Status Flush() {
+    if (buffer_size_ > 0 && buffer_position_ > 0) {
+      // Only need to flush when the write has been buffered
+      RETURN_NOT_OK(
+          context_->CopyHostToDevice(mutable_data_ + position_ - buffer_position_,
+                                     host_buffer_data_, buffer_position_));
+      buffer_position_ = 0;
+    }
+    return Status::OK();
   }
-  return io::FixedSizeBufferWriter::Seek(position);
-}
 
-Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
-  if (memcopy_num_threads_ > 1) {
-    return Status::Invalid("parallel CUDA memcpy not supported");
+  Status Tell(int64_t* position) const {
+    *position = position_;
+    return Status::OK();
   }
 
-  if (nbytes == 0) {
+  Status Write(const uint8_t* data, int64_t nbytes) {
+    if (nbytes == 0) {
+      return Status::OK();
+    }
+
+    if (buffer_size_ > 0) {
+      if (nbytes + buffer_position_ >= buffer_size_) {
+        // Reach end of buffer, write everything
+        RETURN_NOT_OK(Flush());
+        RETURN_NOT_OK(
+            context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
+      } else {
+        // Write bytes to buffer
+        std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
+        buffer_position_ += nbytes;
+      }
+    } else {
+      // Unbuffered write
+      RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
+    }
+    position_ += nbytes;
     return Status::OK();
   }
 
-  if (buffer_size_ > 0) {
-    if (nbytes + buffer_position_ >= buffer_size_) {
-      // Reach end of buffer, write everything
+  Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
+    std::lock_guard<std::mutex> guard(lock_);
+    RETURN_NOT_OK(Seek(position));
+    return Write(data, nbytes);
+  }
+
+  Status SetBufferSize(const int64_t buffer_size) {
+    if (buffer_position_ > 0) {
+      // Flush any buffered data
       RETURN_NOT_OK(Flush());
-      RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
-    } else {
-      // Write bytes to buffer
-      std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes);
-      buffer_position_ += nbytes;
     }
-  } else {
-    // Unbuffered write
-    RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes));
+    RETURN_NOT_OK(AllocateCudaHostBuffer(buffer_size, &host_buffer_));
+    host_buffer_data_ = host_buffer_->mutable_data();
+    buffer_size_ = buffer_size;
+    return Status::OK();
   }
-  position_ += nbytes;
-  return Status::OK();
+
+  int64_t buffer_size() const { return buffer_size_; }
+
+  int64_t buffer_position() const { return buffer_position_; }
+
+ private:
+  std::shared_ptr<CudaContext> context_;
+  std::shared_ptr<CudaBuffer> buffer_;
+  std::mutex lock_;
+  uint8_t* mutable_data_;
+  int64_t size_;
+  int64_t position_;
+
+  // Pinned host buffer for buffering writes on CPU before calling cudaMalloc
+  int64_t buffer_size_;
+  int64_t buffer_position_;
+  std::shared_ptr<CudaHostBuffer> host_buffer_;
+  uint8_t* host_buffer_data_;
+};
+
+CudaBufferWriter::CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer) {
+  impl_.reset(new CudaBufferWriterImpl(buffer));
 }
 
-Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
-  if (buffer_position_ > 0) {
-    // Flush any buffered data
+CudaBufferWriter::~CudaBufferWriter() {}
+
+Status CudaBufferWriter::Close() { return Flush(); }
+
+Status CudaBufferWriter::Flush() { return impl_->Flush(); }
+
+Status CudaBufferWriter::Seek(int64_t position) {
+  if (impl_->buffer_position() > 0) {
     RETURN_NOT_OK(Flush());
   }
-  RETURN_NOT_OK(AllocateCudaHostBuffer(buffer_size, &host_buffer_));
-  host_buffer_data_ = host_buffer_->mutable_data();
-  buffer_size_ = buffer_size;
-  return Status::OK();
+  return impl_->Seek(position);
 }
 
+Status CudaBufferWriter::Tell(int64_t* position) const { return impl_->Tell(position);
}
+
+Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
+  return impl_->Write(data, nbytes);
+}
+
+Status CudaBufferWriter::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
+  return impl_->WriteAt(position, data, nbytes);
+}
+
+Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) {
+  return impl_->SetBufferSize(buffer_size);
+}
+
+int64_t CudaBufferWriter::buffer_size() const { return impl_->buffer_size(); }
+
+int64_t CudaBufferWriter::num_bytes_buffered() const { return impl_->buffer_position();
}
+
 // ----------------------------------------------------------------------
 
 Status AllocateCudaHostBuffer(const int64_t size, std::shared_ptr<CudaHostBuffer>*
out) {

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/gpu/cuda_memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h
index d540737..9ebd2cc 100644
--- a/cpp/src/arrow/gpu/cuda_memory.h
+++ b/cpp/src/arrow/gpu/cuda_memory.h
@@ -145,7 +145,7 @@ class ARROW_EXPORT CudaBufferReader : public io::BufferReader {
 
 /// \class CudaBufferWriter
 /// \brief File interface for writing to CUDA buffers, with optional buffering
-class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter {
+class ARROW_EXPORT CudaBufferWriter : public io::WriteableFile {
  public:
   explicit CudaBufferWriter(const std::shared_ptr<CudaBuffer>& buffer);
   ~CudaBufferWriter();
@@ -156,10 +156,14 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter
{
   /// \brief Flush buffered bytes to GPU
   Status Flush() override;
 
-  // Seek requires flushing if any bytes are buffered
   Status Seek(int64_t position) override;
+
   Status Write(const uint8_t* data, int64_t nbytes) override;
 
+  Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override;
+
+  Status Tell(int64_t* position) const override;
+
   /// \brief Set CPU buffer size to limit calls to cudaMemcpy
   /// \param[in] buffer_size the size of CPU buffer to allocate
   /// \return Status
@@ -168,19 +172,14 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter
{
   Status SetBufferSize(const int64_t buffer_size);
 
   /// \brief Returns size of host (CPU) buffer, 0 for unbuffered
-  int64_t buffer_size() const { return buffer_size_; }
+  int64_t buffer_size() const;
 
   /// \brief Returns number of bytes buffered on host
-  int64_t num_bytes_buffered() const { return buffer_position_; }
+  int64_t num_bytes_buffered() const;
 
  private:
-  std::shared_ptr<CudaContext> context_;
-
-  // Pinned host buffer for buffering writes on CPU before calling cudaMalloc
-  int64_t buffer_size_;
-  int64_t buffer_position_;
-  std::shared_ptr<CudaHostBuffer> host_buffer_;
-  uint8_t* host_buffer_data_;
+  class CudaBufferWriterImpl;
+  std::unique_ptr<CudaBufferWriterImpl> impl_;
 };
 
 /// \brief Allocate CUDA-accessible memory on CPU host

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/file.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc
index ca53632..74c6c09 100644
--- a/cpp/src/arrow/io/file.cc
+++ b/cpp/src/arrow/io/file.cc
@@ -355,10 +355,15 @@ class OSFile {
   }
 
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
-    std::lock_guard<std::mutex> guard(lock_);
     return FileRead(fd_, out, nbytes, bytes_read);
   }
 
+  Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+    std::lock_guard<std::mutex> guard(lock_);
+    RETURN_NOT_OK(Seek(position));
+    return Read(nbytes, bytes_read, out);
+  }
+
   Status Seek(int64_t pos) {
     if (pos < 0) {
       return Status::Invalid("Invalid position");
@@ -384,6 +389,8 @@ class OSFile {
 
   FileMode::type mode() const { return mode_; }
 
+  std::mutex& lock() { return lock_; }
+
  protected:
   Status SetFileName(const std::string& file_name) {
 #if defined(_MSC_VER)
@@ -458,10 +465,24 @@ Status ReadableFile::Close() { return impl_->Close(); }
 Status ReadableFile::Tell(int64_t* pos) const { return impl_->Tell(pos); }
 
 Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) {
+  std::lock_guard<std::mutex> guard(impl_->lock());
   return impl_->Read(nbytes, bytes_read, out);
 }
 
+Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                            uint8_t* out) {
+  return impl_->ReadAt(position, nbytes, bytes_read, out);
+}
+
+Status ReadableFile::ReadAt(int64_t position, int64_t nbytes,
+                            std::shared_ptr<Buffer>* out) {
+  std::lock_guard<std::mutex> guard(impl_->lock());
+  RETURN_NOT_OK(Seek(position));
+  return impl_->ReadBuffer(nbytes, out);
+}
+
 Status ReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>* out) {
+  std::lock_guard<std::mutex> guard(impl_->lock());
   return impl_->ReadBuffer(nbytes, out);
 }
 
@@ -590,6 +611,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer {
 
   int fd() const { return file_->fd(); }
 
+  std::mutex& lock() { return file_->lock(); }
+
  private:
   std::unique_ptr<OSFile> file_;
   int64_t position_;
@@ -671,10 +694,24 @@ Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr<Buffer>*
out) {
   return Status::OK();
 }
 
+Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                                uint8_t* out) {
+  std::lock_guard<std::mutex> guard(memory_map_->lock());
+  RETURN_NOT_OK(Seek(position));
+  return Read(nbytes, bytes_read, out);
+}
+
+Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes,
+                                std::shared_ptr<Buffer>* out) {
+  std::lock_guard<std::mutex> guard(memory_map_->lock());
+  RETURN_NOT_OK(Seek(position));
+  return Read(nbytes, out);
+}
+
 bool MemoryMappedFile::supports_zero_copy() const { return true; }
 
 Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
-  std::lock_guard<std::mutex> guard(lock_);
+  std::lock_guard<std::mutex> guard(memory_map_->lock());
 
   if (!memory_map_->opened() || !memory_map_->writable()) {
     return Status::IOError("Unable to write");
@@ -685,7 +722,7 @@ Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data,
int64_t
 }
 
 Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) {
-  std::lock_guard<std::mutex> guard(lock_);
+  std::lock_guard<std::mutex> guard(memory_map_->lock());
 
   if (!memory_map_->opened() || !memory_map_->writable()) {
     return Status::IOError("Unable to write");

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/file.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h
index 4fb0963..7937fea 100644
--- a/cpp/src/arrow/io/file.h
+++ b/cpp/src/arrow/io/file.h
@@ -96,6 +96,13 @@ class ARROW_EXPORT ReadableFile : public RandomAccessFile {
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override;
   Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
 
+  /// \brief Thread-safe implementation of ReadAt
+  Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                uint8_t* out) override;
+
+  /// \brief Thread-safe implementation of ReadAt
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
   Status GetSize(int64_t* size) override;
   Status Seek(int64_t position) override;
 
@@ -139,6 +146,12 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface {
   // Zero copy read. Not thread-safe
   Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
 
+  Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                uint8_t* out) override;
+
+  /// Default implementation is thread-safe
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
   bool supports_zero_copy() const override;
 
   /// Write data at the current position in the file. Thread-safe

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/interfaces.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc
index 694575b..499da6f 100644
--- a/cpp/src/arrow/io/interfaces.cc
+++ b/cpp/src/arrow/io/interfaces.cc
@@ -32,14 +32,12 @@ RandomAccessFile::RandomAccessFile() { set_mode(FileMode::READ); }
 
 Status RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
                                 uint8_t* out) {
-  std::lock_guard<std::mutex> guard(lock_);
   RETURN_NOT_OK(Seek(position));
   return Read(nbytes, bytes_read, out);
 }
 
 Status RandomAccessFile::ReadAt(int64_t position, int64_t nbytes,
                                 std::shared_ptr<Buffer>* out) {
-  std::lock_guard<std::mutex> guard(lock_);
   RETURN_NOT_OK(Seek(position));
   return Read(nbytes, out);
 }

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/interfaces.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h
index 2c5b351..eee4e95 100644
--- a/cpp/src/arrow/io/interfaces.h
+++ b/cpp/src/arrow/io/interfaces.h
@@ -20,7 +20,6 @@
 
 #include <cstdint>
 #include <memory>
-#include <mutex>
 #include <string>
 #include <vector>
 
@@ -131,18 +130,14 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable
{
   /// Read at position, provide default implementations using Read(...), but can
   /// be overridden
   ///
-  /// Default implementation is thread-safe
+  /// Default implementation is not thread-safe
   virtual Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
                         uint8_t* out);
 
-  /// Default implementation is thread-safe
+  /// Default implementation is not thread-safe
   virtual Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>*
out);
 
-  std::mutex& lock() { return lock_; }
-
  protected:
-  std::mutex lock_;
-
   RandomAccessFile();
 };
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/memory.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc
index 0b91ab5..370d3e9 100644
--- a/cpp/src/arrow/io/memory.cc
+++ b/cpp/src/arrow/io/memory.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
+#include <mutex>
 
 #include "arrow/buffer.h"
 #include "arrow/status.h"
@@ -127,67 +128,109 @@ static constexpr int kMemcopyDefaultNumThreads = 1;
 static constexpr int64_t kMemcopyDefaultBlocksize = 64;
 static constexpr int64_t kMemcopyDefaultThreshold = 1024 * 1024;
 
-/// Input buffer must be mutable, will abort if not
-FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer)
-    : memcopy_num_threads_(kMemcopyDefaultNumThreads),
-      memcopy_blocksize_(kMemcopyDefaultBlocksize),
-      memcopy_threshold_(kMemcopyDefaultThreshold) {
-  DCHECK(buffer) << "Buffer was nullptr";
-  buffer_ = buffer;
-  DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
-  mutable_data_ = buffer->mutable_data();
-  size_ = buffer->size();
-  position_ = 0;
-}
+class FixedSizeBufferWriter::FixedSizeBufferWriterImpl {
+ public:
+  /// Input buffer must be mutable, will abort if not
+
+  /// Input buffer must be mutable, will abort if not
+  explicit FixedSizeBufferWriterImpl(const std::shared_ptr<Buffer>& buffer)
+      : memcopy_num_threads_(kMemcopyDefaultNumThreads),
+        memcopy_blocksize_(kMemcopyDefaultBlocksize),
+        memcopy_threshold_(kMemcopyDefaultThreshold) {
+    buffer_ = buffer;
+    DCHECK(buffer->is_mutable()) << "Must pass mutable buffer";
+    mutable_data_ = buffer->mutable_data();
+    size_ = buffer->size();
+    position_ = 0;
+  }
 
-FixedSizeBufferWriter::~FixedSizeBufferWriter() {}
+  ~FixedSizeBufferWriterImpl() {}
 
-Status FixedSizeBufferWriter::Close() {
-  // no-op
-  return Status::OK();
-}
+  Status Close() {
+    // No-op
+    return Status::OK();
+  }
 
-Status FixedSizeBufferWriter::Seek(int64_t position) {
-  if (position < 0 || position >= size_) {
-    return Status::IOError("position out of bounds");
+  Status Seek(int64_t position) {
+    if (position < 0 || position >= size_) {
+      return Status::IOError("position out of bounds");
+    }
+    position_ = position;
+    return Status::OK();
   }
-  position_ = position;
-  return Status::OK();
-}
+
+  Status Tell(int64_t* position) {
+    *position = position_;
+    return Status::OK();
+  }
+
+  Status Write(const uint8_t* data, int64_t nbytes) {
+    if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) {
+      internal::parallel_memcopy(mutable_data_ + position_, data, nbytes,
+                                 memcopy_blocksize_, memcopy_num_threads_);
+    } else {
+      memcpy(mutable_data_ + position_, data, nbytes);
+    }
+    position_ += nbytes;
+    return Status::OK();
+  }
+
+  Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) {
+    std::lock_guard<std::mutex> guard(lock_);
+    RETURN_NOT_OK(Seek(position));
+    return Write(data, nbytes);
+  }
+
+  void set_memcopy_threads(int num_threads) { memcopy_num_threads_ = num_threads; }
+
+  void set_memcopy_blocksize(int64_t blocksize) { memcopy_blocksize_ = blocksize; }
+
+  void set_memcopy_threshold(int64_t threshold) { memcopy_threshold_ = threshold; }
+
+ private:
+  std::mutex lock_;
+  std::shared_ptr<Buffer> buffer_;
+  uint8_t* mutable_data_;
+  int64_t size_;
+  int64_t position_;
+
+  int memcopy_num_threads_;
+  int64_t memcopy_blocksize_;
+  int64_t memcopy_threshold_;
+};
+
+FixedSizeBufferWriter::~FixedSizeBufferWriter() {}
+
+FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr<Buffer>& buffer)
+    : impl_(new FixedSizeBufferWriterImpl(buffer)) {}
+
+Status FixedSizeBufferWriter::Close() { return impl_->Close(); }
+
+Status FixedSizeBufferWriter::Seek(int64_t position) { return impl_->Seek(position); }
 
 Status FixedSizeBufferWriter::Tell(int64_t* position) const {
-  *position = position_;
-  return Status::OK();
+  return impl_->Tell(position);
 }
 
 Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) {
-  if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) {
-    internal::parallel_memcopy(mutable_data_ + position_, data, nbytes,
-                               memcopy_blocksize_, memcopy_num_threads_);
-  } else {
-    memcpy(mutable_data_ + position_, data, nbytes);
-  }
-  position_ += nbytes;
-  return Status::OK();
+  return impl_->Write(data, nbytes);
 }
 
 Status FixedSizeBufferWriter::WriteAt(int64_t position, const uint8_t* data,
                                       int64_t nbytes) {
-  std::lock_guard<std::mutex> guard(lock_);
-  RETURN_NOT_OK(Seek(position));
-  return Write(data, nbytes);
+  return impl_->WriteAt(position, data, nbytes);
 }
 
 void FixedSizeBufferWriter::set_memcopy_threads(int num_threads) {
-  memcopy_num_threads_ = num_threads;
+  impl_->set_memcopy_threads(num_threads);
 }
 
 void FixedSizeBufferWriter::set_memcopy_blocksize(int64_t blocksize) {
-  memcopy_blocksize_ = blocksize;
+  impl_->set_memcopy_blocksize(blocksize);
 }
 
 void FixedSizeBufferWriter::set_memcopy_threshold(int64_t threshold) {
-  memcopy_threshold_ = threshold;
+  impl_->set_memcopy_threshold(threshold);
 }
 
 // ----------------------------------------------------------------------
@@ -233,6 +276,18 @@ Status BufferReader::Read(int64_t nbytes, std::shared_ptr<Buffer>*
out) {
   return Status::OK();
 }
 
+Status BufferReader::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                            uint8_t* out) {
+  RETURN_NOT_OK(Seek(position));
+  return Read(nbytes, bytes_read, out);
+}
+
+Status BufferReader::ReadAt(int64_t position, int64_t nbytes,
+                            std::shared_ptr<Buffer>* out) {
+  RETURN_NOT_OK(Seek(position));
+  return Read(nbytes, out);
+}
+
 Status BufferReader::GetSize(int64_t* size) {
   *size = size_;
   return Status::OK();

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/memory.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h
index 563000f..978c198 100644
--- a/cpp/src/arrow/io/memory.h
+++ b/cpp/src/arrow/io/memory.h
@@ -22,7 +22,6 @@
 
 #include <cstdint>
 #include <memory>
-#include <mutex>
 
 #include "arrow/io/interfaces.h"
 #include "arrow/util/visibility.h"
@@ -99,15 +98,8 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile {
   void set_memcopy_threshold(int64_t threshold);
 
  protected:
-  std::mutex lock_;
-  std::shared_ptr<Buffer> buffer_;
-  uint8_t* mutable_data_;
-  int64_t size_;
-  int64_t position_;
-
-  int memcopy_num_threads_;
-  int64_t memcopy_blocksize_;
-  int64_t memcopy_threshold_;
+  class FixedSizeBufferWriterImpl;
+  std::unique_ptr<FixedSizeBufferWriterImpl> impl_;
 };
 
 /// \class BufferReader
@@ -125,6 +117,12 @@ class ARROW_EXPORT BufferReader : public RandomAccessFile {
   // Zero copy read
   Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
 
+  Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                uint8_t* out) override;
+
+  /// Default implementation is thread-safe
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
   Status GetSize(int64_t* size) override;
   Status Seek(int64_t position) override;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/memory_pool-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc
index 552c79b..0a4785d 100644
--- a/cpp/src/arrow/memory_pool-test.cc
+++ b/cpp/src/arrow/memory_pool-test.cc
@@ -59,39 +59,36 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) {
 }
 
 TEST(DefaultMemoryPoolDeathTest, MaxMemory) {
-  DefaultMemoryPool pool;
-
-  ASSERT_EQ(0, pool.max_memory());
+  MemoryPool* pool = default_memory_pool();
 
   uint8_t* data;
-  ASSERT_OK(pool.Allocate(100, &data));
+  ASSERT_OK(pool->Allocate(100, &data));
 
   uint8_t* data2;
-  ASSERT_OK(pool.Allocate(100, &data2));
+  ASSERT_OK(pool->Allocate(100, &data2));
 
-  pool.Free(data, 100);
-  pool.Free(data2, 100);
+  pool->Free(data, 100);
+  pool->Free(data2, 100);
 
-  ASSERT_EQ(200, pool.max_memory());
+  ASSERT_EQ(200, pool->max_memory());
 }
 
 #endif  // ARROW_VALGRIND
 
 TEST(LoggingMemoryPool, Logging) {
-  DefaultMemoryPool pool;
-  LoggingMemoryPool lp(&pool);
+  MemoryPool* pool = default_memory_pool();
 
-  ASSERT_EQ(0, lp.max_memory());
+  LoggingMemoryPool lp(pool);
 
   uint8_t* data;
-  ASSERT_OK(pool.Allocate(100, &data));
+  ASSERT_OK(pool->Allocate(100, &data));
 
   uint8_t* data2;
-  ASSERT_OK(pool.Allocate(100, &data2));
+  ASSERT_OK(pool->Allocate(100, &data2));
 
-  pool.Free(data, 100);
-  pool.Free(data2, 100);
+  pool->Free(data, 100);
+  pool->Free(data2, 100);
 
-  ASSERT_EQ(200, pool.max_memory());
+  ASSERT_EQ(200, pool->max_memory());
 }
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/memory_pool.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc
index 851065b..3496636 100644
--- a/cpp/src/arrow/memory_pool.cc
+++ b/cpp/src/arrow/memory_pool.cc
@@ -85,74 +85,82 @@ MemoryPool::~MemoryPool() {}
 
 int64_t MemoryPool::max_memory() const { return -1; }
 
-DefaultMemoryPool::DefaultMemoryPool() : bytes_allocated_(0) { max_memory_ = 0; }
+class DefaultMemoryPool : public MemoryPool {
+ public:
+  DefaultMemoryPool() : bytes_allocated_(0) { max_memory_ = 0; }
 
-Status DefaultMemoryPool::Allocate(int64_t size, uint8_t** out) {
-  RETURN_NOT_OK(AllocateAligned(size, out));
-  bytes_allocated_ += size;
+  ~DefaultMemoryPool() {}
 
-  {
-    std::lock_guard<std::mutex> guard(lock_);
-    if (bytes_allocated_ > max_memory_) {
-      max_memory_ = bytes_allocated_.load();
+  Status Allocate(int64_t size, uint8_t** out) override {
+    RETURN_NOT_OK(AllocateAligned(size, out));
+    bytes_allocated_ += size;
+
+    {
+      std::lock_guard<std::mutex> guard(lock_);
+      if (bytes_allocated_ > max_memory_) {
+        max_memory_ = bytes_allocated_.load();
+      }
     }
+    return Status::OK();
   }
-  return Status::OK();
-}
 
-Status DefaultMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) {
+  Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override {
 #ifdef ARROW_JEMALLOC
-  *ptr = reinterpret_cast<uint8_t*>(rallocx(*ptr, new_size, MALLOCX_ALIGN(kAlignment)));
-  if (*ptr == NULL) {
-    std::stringstream ss;
-    ss << "realloc of size " << new_size << " failed";
-    return Status::OutOfMemory(ss.str());
-  }
+    *ptr = reinterpret_cast<uint8_t*>(rallocx(*ptr, new_size, MALLOCX_ALIGN(kAlignment)));
+    if (*ptr == NULL) {
+      std::stringstream ss;
+      ss << "realloc of size " << new_size << " failed";
+      return Status::OutOfMemory(ss.str());
+    }
 #else
-  // Note: We cannot use realloc() here as it doesn't guarantee alignment.
-
-  // Allocate new chunk
-  uint8_t* out = nullptr;
-  RETURN_NOT_OK(AllocateAligned(new_size, &out));
-  DCHECK(out);
-  // Copy contents and release old memory chunk
-  memcpy(out, *ptr, static_cast<size_t>(std::min(new_size, old_size)));
+    // Note: We cannot use realloc() here as it doesn't guarantee alignment.
+
+    // Allocate new chunk
+    uint8_t* out = nullptr;
+    RETURN_NOT_OK(AllocateAligned(new_size, &out));
+    DCHECK(out);
+    // Copy contents and release old memory chunk
+    memcpy(out, *ptr, static_cast<size_t>(std::min(new_size, old_size)));
 #ifdef _MSC_VER
-  _aligned_free(*ptr);
+    _aligned_free(*ptr);
 #else
-  std::free(*ptr);
+    std::free(*ptr);
 #endif  // defined(_MSC_VER)
-  *ptr = out;
+    *ptr = out;
 #endif  // defined(ARROW_JEMALLOC)
 
-  bytes_allocated_ += new_size - old_size;
-  {
-    std::lock_guard<std::mutex> guard(lock_);
-    if (bytes_allocated_ > max_memory_) {
-      max_memory_ = bytes_allocated_.load();
+    bytes_allocated_ += new_size - old_size;
+    {
+      std::lock_guard<std::mutex> guard(lock_);
+      if (bytes_allocated_ > max_memory_) {
+        max_memory_ = bytes_allocated_.load();
+      }
     }
-  }
 
-  return Status::OK();
-}
+    return Status::OK();
+  }
 
-int64_t DefaultMemoryPool::bytes_allocated() const { return bytes_allocated_.load(); }
+  int64_t bytes_allocated() const override { return bytes_allocated_.load(); }
 
-void DefaultMemoryPool::Free(uint8_t* buffer, int64_t size) {
-  DCHECK_GE(bytes_allocated_, size);
+  void Free(uint8_t* buffer, int64_t size) override {
+    DCHECK_GE(bytes_allocated_, size);
 #ifdef _MSC_VER
-  _aligned_free(buffer);
+    _aligned_free(buffer);
 #elif defined(ARROW_JEMALLOC)
-  dallocx(buffer, MALLOCX_ALIGN(kAlignment));
+    dallocx(buffer, MALLOCX_ALIGN(kAlignment));
 #else
-  std::free(buffer);
+    std::free(buffer);
 #endif
-  bytes_allocated_ -= size;
-}
+    bytes_allocated_ -= size;
+  }
 
-int64_t DefaultMemoryPool::max_memory() const { return max_memory_.load(); }
+  int64_t max_memory() const override { return max_memory_.load(); }
 
-DefaultMemoryPool::~DefaultMemoryPool() {}
+ private:
+  mutable std::mutex lock_;
+  std::atomic<int64_t> bytes_allocated_;
+  std::atomic<int64_t> max_memory_;
+};
 
 MemoryPool* default_memory_pool() {
   static DefaultMemoryPool default_memory_pool_;

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/memory_pool.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h
index 5bb2b56..52ec67f 100644
--- a/cpp/src/arrow/memory_pool.h
+++ b/cpp/src/arrow/memory_pool.h
@@ -20,7 +20,6 @@
 
 #include <atomic>
 #include <cstdint>
-#include <mutex>
 
 #include "arrow/util/visibility.h"
 
@@ -69,26 +68,6 @@ class ARROW_EXPORT MemoryPool {
   MemoryPool();
 };
 
-class ARROW_EXPORT DefaultMemoryPool : public MemoryPool {
- public:
-  DefaultMemoryPool();
-  virtual ~DefaultMemoryPool();
-
-  Status Allocate(int64_t size, uint8_t** out) override;
-  Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
-
-  void Free(uint8_t* buffer, int64_t size) override;
-
-  int64_t bytes_allocated() const override;
-
-  int64_t max_memory() const override;
-
- private:
-  mutable std::mutex lock_;
-  std::atomic<int64_t> bytes_allocated_;
-  std::atomic<int64_t> max_memory_;
-};
-
 class ARROW_EXPORT LoggingMemoryPool : public MemoryPool {
  public:
   explicit LoggingMemoryPool(MemoryPool* pool);

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/python/io.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc
index 99b99f7..b01358a 100644
--- a/cpp/src/arrow/python/io.cc
+++ b/cpp/src/arrow/python/io.cc
@@ -19,6 +19,7 @@
 
 #include <cstdint>
 #include <cstdlib>
+#include <mutex>
 #include <string>
 
 #include "arrow/io/memory.h"
@@ -33,10 +34,6 @@ namespace py {
 // ----------------------------------------------------------------------
 // Python file
 
-PythonFile::PythonFile(PyObject* file) : file_(file) { Py_INCREF(file_); }
-
-PythonFile::~PythonFile() { Py_DECREF(file_); }
-
 // This is annoying: because C++11 does not allow implicit conversion of string
 // literals to non-const char*, we need to go through some gymnastics to use
 // PyObject_CallMethod without a lot of pain (its arguments are non-const
@@ -48,53 +45,68 @@ static inline PyObject* cpp_PyObject_CallMethod(PyObject* obj, const char*
metho
                              const_cast<char*>(argspec), args...);
 }
 
-Status PythonFile::Close() {
-  // whence: 0 for relative to start of file, 2 for end of file
-  PyObject* result = cpp_PyObject_CallMethod(file_, "close", "()");
-  Py_XDECREF(result);
-  PY_RETURN_IF_ERROR(StatusCode::IOError);
-  return Status::OK();
-}
-
-Status PythonFile::Seek(int64_t position, int whence) {
-  // whence: 0 for relative to start of file, 2 for end of file
-  PyObject* result = cpp_PyObject_CallMethod(file_, "seek", "(ii)", position, whence);
-  Py_XDECREF(result);
-  PY_RETURN_IF_ERROR(StatusCode::IOError);
-  return Status::OK();
-}
-
-Status PythonFile::Read(int64_t nbytes, PyObject** out) {
-  PyObject* result = cpp_PyObject_CallMethod(file_, "read", "(i)", nbytes);
-  PY_RETURN_IF_ERROR(StatusCode::IOError);
-  *out = result;
-  return Status::OK();
-}
-
-Status PythonFile::Write(const uint8_t* data, int64_t nbytes) {
-  PyObject* py_data =
-      PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), nbytes);
-  PY_RETURN_IF_ERROR(StatusCode::IOError);
-
-  PyObject* result = cpp_PyObject_CallMethod(file_, "write", "(O)", py_data);
-  Py_XDECREF(py_data);
-  Py_XDECREF(result);
-  PY_RETURN_IF_ERROR(StatusCode::IOError);
-  return Status::OK();
-}
-
-Status PythonFile::Tell(int64_t* position) {
-  PyObject* result = cpp_PyObject_CallMethod(file_, "tell", "()");
-  PY_RETURN_IF_ERROR(StatusCode::IOError);
-
-  *position = PyLong_AsLongLong(result);
-  Py_DECREF(result);
-
-  // PyLong_AsLongLong can raise OverflowError
-  PY_RETURN_IF_ERROR(StatusCode::IOError);
-
-  return Status::OK();
-}
+// A common interface to a Python file-like object. Must acquire GIL before
+// calling any methods
+class PythonFile {
+ public:
+  explicit PythonFile(PyObject* file) : file_(file) { Py_INCREF(file_); }
+
+  ~PythonFile() { Py_DECREF(file_); }
+
+  Status Close() {
+    // whence: 0 for relative to start of file, 2 for end of file
+    PyObject* result = cpp_PyObject_CallMethod(file_, "close", "()");
+    Py_XDECREF(result);
+    PY_RETURN_IF_ERROR(StatusCode::IOError);
+    return Status::OK();
+  }
+
+  Status Seek(int64_t position, int whence) {
+    // whence: 0 for relative to start of file, 2 for end of file
+    PyObject* result = cpp_PyObject_CallMethod(file_, "seek", "(ii)", position, whence);
+    Py_XDECREF(result);
+    PY_RETURN_IF_ERROR(StatusCode::IOError);
+    return Status::OK();
+  }
+
+  Status Read(int64_t nbytes, PyObject** out) {
+    PyObject* result = cpp_PyObject_CallMethod(file_, "read", "(i)", nbytes);
+    PY_RETURN_IF_ERROR(StatusCode::IOError);
+    *out = result;
+    return Status::OK();
+  }
+
+  Status Write(const uint8_t* data, int64_t nbytes) {
+    PyObject* py_data =
+        PyBytes_FromStringAndSize(reinterpret_cast<const char*>(data), nbytes);
+    PY_RETURN_IF_ERROR(StatusCode::IOError);
+
+    PyObject* result = cpp_PyObject_CallMethod(file_, "write", "(O)", py_data);
+    Py_XDECREF(py_data);
+    Py_XDECREF(result);
+    PY_RETURN_IF_ERROR(StatusCode::IOError);
+    return Status::OK();
+  }
+
+  Status Tell(int64_t* position) {
+    PyObject* result = cpp_PyObject_CallMethod(file_, "tell", "()");
+    PY_RETURN_IF_ERROR(StatusCode::IOError);
+
+    *position = PyLong_AsLongLong(result);
+    Py_DECREF(result);
+
+    // PyLong_AsLongLong can raise OverflowError
+    PY_RETURN_IF_ERROR(StatusCode::IOError);
+
+    return Status::OK();
+  }
+
+  std::mutex& lock() { return lock_; }
+
+ private:
+  std::mutex lock_;
+  PyObject* file_;
+};
 
 // ----------------------------------------------------------------------
 // Seekable input stream
@@ -142,6 +154,20 @@ Status PyReadableFile::Read(int64_t nbytes, std::shared_ptr<Buffer>*
out) {
   return Status::OK();
 }
 
+Status PyReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                              uint8_t* out) {
+  std::lock_guard<std::mutex> guard(file_->lock());
+  RETURN_NOT_OK(Seek(position));
+  return Read(nbytes, bytes_read, out);
+}
+
+Status PyReadableFile::ReadAt(int64_t position, int64_t nbytes,
+                              std::shared_ptr<Buffer>* out) {
+  std::lock_guard<std::mutex> guard(file_->lock());
+  RETURN_NOT_OK(Seek(position));
+  return Read(nbytes, out);
+}
+
 Status PyReadableFile::GetSize(int64_t* size) {
   PyAcquireGIL lock;
 

http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/python/io.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h
index eda7346..bf5db53 100644
--- a/cpp/src/arrow/python/io.h
+++ b/cpp/src/arrow/python/io.h
@@ -32,22 +32,7 @@ class MemoryPool;
 
 namespace py {
 
-// A common interface to a Python file-like object. Must acquire GIL before
-// calling any methods
-class ARROW_EXPORT PythonFile {
- public:
-  explicit PythonFile(PyObject* file);
-  ~PythonFile();
-
-  Status Close();
-  Status Seek(int64_t position, int whence);
-  Status Read(int64_t nbytes, PyObject** out);
-  Status Tell(int64_t* position);
-  Status Write(const uint8_t* data, int64_t nbytes);
-
- private:
-  PyObject* file_;
-};
+class ARROW_NO_EXPORT PythonFile;
 
 class ARROW_EXPORT PyReadableFile : public io::RandomAccessFile {
  public:
@@ -59,6 +44,13 @@ class ARROW_EXPORT PyReadableFile : public io::RandomAccessFile {
   Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override;
   Status Read(int64_t nbytes, std::shared_ptr<Buffer>* out) override;
 
+  // Thread-safe version
+  Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read,
+                uint8_t* out) override;
+
+  // Thread-safe version
+  Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr<Buffer>* out) override;
+
   Status GetSize(int64_t* size) override;
 
   Status Seek(int64_t position) override;


Mime
View raw message