Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 2FCE1200D12 for ; Sat, 7 Oct 2017 21:48:32 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 2E39A1609DF; Sat, 7 Oct 2017 19:48:32 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7B7611609DA for ; Sat, 7 Oct 2017 21:48:30 +0200 (CEST) Received: (qmail 91147 invoked by uid 500); 7 Oct 2017 19:48:29 -0000 Mailing-List: contact commits-help@arrow.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@arrow.apache.org Delivered-To: mailing list commits@arrow.apache.org Received: (qmail 91138 invoked by uid 99); 7 Oct 2017 19:48:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 07 Oct 2017 19:48:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 7E3DBE7DA2; Sat, 7 Oct 2017 19:48:29 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@arrow.apache.org Message-Id: <0c88610254bb4f73bfe39a3a0ec12ea1@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: arrow git commit: ARROW-1641: [C++] Hide std::mutex from public headers Date: Sat, 7 Oct 2017 19:48:29 +0000 (UTC) archived-at: Sat, 07 Oct 2017 19:48:32 -0000 Repository: arrow Updated Branches: refs/heads/master 8309556c7 -> b29b06586 ARROW-1641: [C++] Hide std::mutex from public headers This was one part of ARROW-1134 that we can push through. I had to do some refactoring since there was a mutex in one of the base file interfaces. It doesn't appear that this will impact parquet-cpp or other Arrow users Author: Wes McKinney Closes #1165 from wesm/ARROW-1641 and squashes the following commits: 9f4a466d [Wes McKinney] Make PyReadableFile C++ threadsafe also 002c5a62 [Wes McKinney] Make ReadableFile::Read threadsafe again e373ff2a [Wes McKinney] Rewrite CudaBufferWriter to not inherit from FixedSizeBufferWriter. Fix deadlocks in ReadableFile 6ab45e8f [Wes McKinney] Hide mutexes from public headers Project: http://git-wip-us.apache.org/repos/asf/arrow/repo Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/b29b0658 Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/b29b0658 Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/b29b0658 Branch: refs/heads/master Commit: b29b06586ff89bee93062ad082a930d30d5f45df Parents: 8309556 Author: Wes McKinney Authored: Sat Oct 7 15:48:18 2017 -0400 Committer: Wes McKinney Committed: Sat Oct 7 15:48:18 2017 -0400 ---------------------------------------------------------------------- cpp/src/arrow/allocator-test.cc | 15 ++- cpp/src/arrow/array.h | 2 +- cpp/src/arrow/gpu/cuda_memory.cc | 161 +++++++++++++++++++++++---------- cpp/src/arrow/gpu/cuda_memory.h | 21 ++--- cpp/src/arrow/io/file.cc | 43 ++++++++- cpp/src/arrow/io/file.h | 13 +++ cpp/src/arrow/io/interfaces.cc | 2 - cpp/src/arrow/io/interfaces.h | 9 +- cpp/src/arrow/io/memory.cc | 133 +++++++++++++++++++-------- cpp/src/arrow/io/memory.h | 18 ++-- cpp/src/arrow/memory_pool-test.cc | 29 +++--- cpp/src/arrow/memory_pool.cc | 100 ++++++++++---------- cpp/src/arrow/memory_pool.h | 21 ----- cpp/src/arrow/python/io.cc | 128 +++++++++++++++----------- cpp/src/arrow/python/io.h | 24 ++--- 15 files changed, 442 insertions(+), 277 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/allocator-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/allocator-test.cc b/cpp/src/arrow/allocator-test.cc index f3af47c..7c768be 100644 --- a/cpp/src/arrow/allocator-test.cc +++ b/cpp/src/arrow/allocator-test.cc @@ -59,17 +59,16 @@ TEST(stl_allocator, FreeLargeMemory) { } TEST(stl_allocator, MaxMemory) { - DefaultMemoryPool pool; + auto pool = default_memory_pool(); - ASSERT_EQ(0, pool.max_memory()); - stl_allocator alloc(&pool); - uint8_t* data = alloc.allocate(100); - uint8_t* data2 = alloc.allocate(100); + stl_allocator alloc(pool); + uint8_t* data = alloc.allocate(1000); + uint8_t* data2 = alloc.allocate(1000); - alloc.deallocate(data, 100); - alloc.deallocate(data2, 100); + alloc.deallocate(data, 1000); + alloc.deallocate(data2, 1000); - ASSERT_EQ(200, pool.max_memory()); + ASSERT_EQ(2000, pool->max_memory()); } #endif // ARROW_VALGRIND http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/array.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/array.h b/cpp/src/arrow/array.h index 975cd5b..36bceeb 100644 --- a/cpp/src/arrow/array.h +++ b/cpp/src/arrow/array.h @@ -179,7 +179,7 @@ class ARROW_EXPORT Array { /// boundscheck bool IsValid(int64_t i) const { return null_bitmap_data_ != nullptr && - BitUtil::GetBit(null_bitmap_data_, i + data_->offset); + BitUtil::GetBit(null_bitmap_data_, i + data_->offset); } /// Size in the number of elements this array contains. http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/gpu/cuda_memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/gpu/cuda_memory.cc b/cpp/src/arrow/gpu/cuda_memory.cc index d1026ca..949c1d7 100644 --- a/cpp/src/arrow/gpu/cuda_memory.cc +++ b/cpp/src/arrow/gpu/cuda_memory.cc @@ -21,6 +21,7 @@ #include #include #include +#include #include @@ -151,71 +152,139 @@ Status CudaBufferReader::Read(int64_t nbytes, std::shared_ptr* out) { // ---------------------------------------------------------------------- // CudaBufferWriter -CudaBufferWriter::CudaBufferWriter(const std::shared_ptr& buffer) - : io::FixedSizeBufferWriter(buffer), - context_(buffer->context()), - buffer_size_(0), - buffer_position_(0) {} - -CudaBufferWriter::~CudaBufferWriter() {} - -Status CudaBufferWriter::Close() { return Flush(); } +class CudaBufferWriter::CudaBufferWriterImpl { + public: + explicit CudaBufferWriterImpl(const std::shared_ptr& buffer) + : context_(buffer->context()), + buffer_(buffer), + buffer_size_(0), + buffer_position_(0) { + buffer_ = buffer; + DCHECK(buffer->is_mutable()) << "Must pass mutable buffer"; + mutable_data_ = buffer->mutable_data(); + size_ = buffer->size(); + position_ = 0; + } -Status CudaBufferWriter::Flush() { - if (buffer_size_ > 0 && buffer_position_ > 0) { - // Only need to flush when the write has been buffered - RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_ - buffer_position_, - host_buffer_data_, buffer_position_)); - buffer_position_ = 0; + Status Seek(int64_t position) { + if (position < 0 || position >= size_) { + return Status::IOError("position out of bounds"); + } + position_ = position; + return Status::OK(); } - return Status::OK(); -} -Status CudaBufferWriter::Seek(int64_t position) { - if (buffer_position_ > 0) { - RETURN_NOT_OK(Flush()); + Status Flush() { + if (buffer_size_ > 0 && buffer_position_ > 0) { + // Only need to flush when the write has been buffered + RETURN_NOT_OK( + context_->CopyHostToDevice(mutable_data_ + position_ - buffer_position_, + host_buffer_data_, buffer_position_)); + buffer_position_ = 0; + } + return Status::OK(); } - return io::FixedSizeBufferWriter::Seek(position); -} -Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) { - if (memcopy_num_threads_ > 1) { - return Status::Invalid("parallel CUDA memcpy not supported"); + Status Tell(int64_t* position) const { + *position = position_; + return Status::OK(); } - if (nbytes == 0) { + Status Write(const uint8_t* data, int64_t nbytes) { + if (nbytes == 0) { + return Status::OK(); + } + + if (buffer_size_ > 0) { + if (nbytes + buffer_position_ >= buffer_size_) { + // Reach end of buffer, write everything + RETURN_NOT_OK(Flush()); + RETURN_NOT_OK( + context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes)); + } else { + // Write bytes to buffer + std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes); + buffer_position_ += nbytes; + } + } else { + // Unbuffered write + RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes)); + } + position_ += nbytes; return Status::OK(); } - if (buffer_size_ > 0) { - if (nbytes + buffer_position_ >= buffer_size_) { - // Reach end of buffer, write everything + Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) { + std::lock_guard guard(lock_); + RETURN_NOT_OK(Seek(position)); + return Write(data, nbytes); + } + + Status SetBufferSize(const int64_t buffer_size) { + if (buffer_position_ > 0) { + // Flush any buffered data RETURN_NOT_OK(Flush()); - RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes)); - } else { - // Write bytes to buffer - std::memcpy(host_buffer_data_ + buffer_position_, data, nbytes); - buffer_position_ += nbytes; } - } else { - // Unbuffered write - RETURN_NOT_OK(context_->CopyHostToDevice(mutable_data_ + position_, data, nbytes)); + RETURN_NOT_OK(AllocateCudaHostBuffer(buffer_size, &host_buffer_)); + host_buffer_data_ = host_buffer_->mutable_data(); + buffer_size_ = buffer_size; + return Status::OK(); } - position_ += nbytes; - return Status::OK(); + + int64_t buffer_size() const { return buffer_size_; } + + int64_t buffer_position() const { return buffer_position_; } + + private: + std::shared_ptr context_; + std::shared_ptr buffer_; + std::mutex lock_; + uint8_t* mutable_data_; + int64_t size_; + int64_t position_; + + // Pinned host buffer for buffering writes on CPU before calling cudaMalloc + int64_t buffer_size_; + int64_t buffer_position_; + std::shared_ptr host_buffer_; + uint8_t* host_buffer_data_; +}; + +CudaBufferWriter::CudaBufferWriter(const std::shared_ptr& buffer) { + impl_.reset(new CudaBufferWriterImpl(buffer)); } -Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) { - if (buffer_position_ > 0) { - // Flush any buffered data +CudaBufferWriter::~CudaBufferWriter() {} + +Status CudaBufferWriter::Close() { return Flush(); } + +Status CudaBufferWriter::Flush() { return impl_->Flush(); } + +Status CudaBufferWriter::Seek(int64_t position) { + if (impl_->buffer_position() > 0) { RETURN_NOT_OK(Flush()); } - RETURN_NOT_OK(AllocateCudaHostBuffer(buffer_size, &host_buffer_)); - host_buffer_data_ = host_buffer_->mutable_data(); - buffer_size_ = buffer_size; - return Status::OK(); + return impl_->Seek(position); } +Status CudaBufferWriter::Tell(int64_t* position) const { return impl_->Tell(position); } + +Status CudaBufferWriter::Write(const uint8_t* data, int64_t nbytes) { + return impl_->Write(data, nbytes); +} + +Status CudaBufferWriter::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) { + return impl_->WriteAt(position, data, nbytes); +} + +Status CudaBufferWriter::SetBufferSize(const int64_t buffer_size) { + return impl_->SetBufferSize(buffer_size); +} + +int64_t CudaBufferWriter::buffer_size() const { return impl_->buffer_size(); } + +int64_t CudaBufferWriter::num_bytes_buffered() const { return impl_->buffer_position(); } + // ---------------------------------------------------------------------- Status AllocateCudaHostBuffer(const int64_t size, std::shared_ptr* out) { http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/gpu/cuda_memory.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/gpu/cuda_memory.h b/cpp/src/arrow/gpu/cuda_memory.h index d540737..9ebd2cc 100644 --- a/cpp/src/arrow/gpu/cuda_memory.h +++ b/cpp/src/arrow/gpu/cuda_memory.h @@ -145,7 +145,7 @@ class ARROW_EXPORT CudaBufferReader : public io::BufferReader { /// \class CudaBufferWriter /// \brief File interface for writing to CUDA buffers, with optional buffering -class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter { +class ARROW_EXPORT CudaBufferWriter : public io::WriteableFile { public: explicit CudaBufferWriter(const std::shared_ptr& buffer); ~CudaBufferWriter(); @@ -156,10 +156,14 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter { /// \brief Flush buffered bytes to GPU Status Flush() override; - // Seek requires flushing if any bytes are buffered Status Seek(int64_t position) override; + Status Write(const uint8_t* data, int64_t nbytes) override; + Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) override; + + Status Tell(int64_t* position) const override; + /// \brief Set CPU buffer size to limit calls to cudaMemcpy /// \param[in] buffer_size the size of CPU buffer to allocate /// \return Status @@ -168,19 +172,14 @@ class ARROW_EXPORT CudaBufferWriter : public io::FixedSizeBufferWriter { Status SetBufferSize(const int64_t buffer_size); /// \brief Returns size of host (CPU) buffer, 0 for unbuffered - int64_t buffer_size() const { return buffer_size_; } + int64_t buffer_size() const; /// \brief Returns number of bytes buffered on host - int64_t num_bytes_buffered() const { return buffer_position_; } + int64_t num_bytes_buffered() const; private: - std::shared_ptr context_; - - // Pinned host buffer for buffering writes on CPU before calling cudaMalloc - int64_t buffer_size_; - int64_t buffer_position_; - std::shared_ptr host_buffer_; - uint8_t* host_buffer_data_; + class CudaBufferWriterImpl; + std::unique_ptr impl_; }; /// \brief Allocate CUDA-accessible memory on CPU host http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/file.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/file.cc b/cpp/src/arrow/io/file.cc index ca53632..74c6c09 100644 --- a/cpp/src/arrow/io/file.cc +++ b/cpp/src/arrow/io/file.cc @@ -355,10 +355,15 @@ class OSFile { } Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { - std::lock_guard guard(lock_); return FileRead(fd_, out, nbytes, bytes_read); } + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + std::lock_guard guard(lock_); + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, out); + } + Status Seek(int64_t pos) { if (pos < 0) { return Status::Invalid("Invalid position"); @@ -384,6 +389,8 @@ class OSFile { FileMode::type mode() const { return mode_; } + std::mutex& lock() { return lock_; } + protected: Status SetFileName(const std::string& file_name) { #if defined(_MSC_VER) @@ -458,10 +465,24 @@ Status ReadableFile::Close() { return impl_->Close(); } Status ReadableFile::Tell(int64_t* pos) const { return impl_->Tell(pos); } Status ReadableFile::Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) { + std::lock_guard guard(impl_->lock()); return impl_->Read(nbytes, bytes_read, out); } +Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* out) { + return impl_->ReadAt(position, nbytes, bytes_read, out); +} + +Status ReadableFile::ReadAt(int64_t position, int64_t nbytes, + std::shared_ptr* out) { + std::lock_guard guard(impl_->lock()); + RETURN_NOT_OK(Seek(position)); + return impl_->ReadBuffer(nbytes, out); +} + Status ReadableFile::Read(int64_t nbytes, std::shared_ptr* out) { + std::lock_guard guard(impl_->lock()); return impl_->ReadBuffer(nbytes, out); } @@ -590,6 +611,8 @@ class MemoryMappedFile::MemoryMap : public MutableBuffer { int fd() const { return file_->fd(); } + std::mutex& lock() { return file_->lock(); } + private: std::unique_ptr file_; int64_t position_; @@ -671,10 +694,24 @@ Status MemoryMappedFile::Read(int64_t nbytes, std::shared_ptr* out) { return Status::OK(); } +Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* out) { + std::lock_guard guard(memory_map_->lock()); + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, out); +} + +Status MemoryMappedFile::ReadAt(int64_t position, int64_t nbytes, + std::shared_ptr* out) { + std::lock_guard guard(memory_map_->lock()); + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, out); +} + bool MemoryMappedFile::supports_zero_copy() const { return true; } Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) { - std::lock_guard guard(lock_); + std::lock_guard guard(memory_map_->lock()); if (!memory_map_->opened() || !memory_map_->writable()) { return Status::IOError("Unable to write"); @@ -685,7 +722,7 @@ Status MemoryMappedFile::WriteAt(int64_t position, const uint8_t* data, int64_t } Status MemoryMappedFile::Write(const uint8_t* data, int64_t nbytes) { - std::lock_guard guard(lock_); + std::lock_guard guard(memory_map_->lock()); if (!memory_map_->opened() || !memory_map_->writable()) { return Status::IOError("Unable to write"); http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/file.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/file.h b/cpp/src/arrow/io/file.h index 4fb0963..7937fea 100644 --- a/cpp/src/arrow/io/file.h +++ b/cpp/src/arrow/io/file.h @@ -96,6 +96,13 @@ class ARROW_EXPORT ReadableFile : public RandomAccessFile { Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* buffer) override; Status Read(int64_t nbytes, std::shared_ptr* out) override; + /// \brief Thread-safe implementation of ReadAt + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* out) override; + + /// \brief Thread-safe implementation of ReadAt + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + Status GetSize(int64_t* size) override; Status Seek(int64_t position) override; @@ -139,6 +146,12 @@ class ARROW_EXPORT MemoryMappedFile : public ReadWriteFileInterface { // Zero copy read. Not thread-safe Status Read(int64_t nbytes, std::shared_ptr* out) override; + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* out) override; + + /// Default implementation is thread-safe + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + bool supports_zero_copy() const override; /// Write data at the current position in the file. Thread-safe http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/interfaces.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/interfaces.cc b/cpp/src/arrow/io/interfaces.cc index 694575b..499da6f 100644 --- a/cpp/src/arrow/io/interfaces.cc +++ b/cpp/src/arrow/io/interfaces.cc @@ -32,14 +32,12 @@ RandomAccessFile::RandomAccessFile() { set_mode(FileMode::READ); } Status RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out) { - std::lock_guard guard(lock_); RETURN_NOT_OK(Seek(position)); return Read(nbytes, bytes_read, out); } Status RandomAccessFile::ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) { - std::lock_guard guard(lock_); RETURN_NOT_OK(Seek(position)); return Read(nbytes, out); } http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/interfaces.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/interfaces.h b/cpp/src/arrow/io/interfaces.h index 2c5b351..eee4e95 100644 --- a/cpp/src/arrow/io/interfaces.h +++ b/cpp/src/arrow/io/interfaces.h @@ -20,7 +20,6 @@ #include #include -#include #include #include @@ -131,18 +130,14 @@ class ARROW_EXPORT RandomAccessFile : public InputStream, public Seekable { /// Read at position, provide default implementations using Read(...), but can /// be overridden /// - /// Default implementation is thread-safe + /// Default implementation is not thread-safe virtual Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, uint8_t* out); - /// Default implementation is thread-safe + /// Default implementation is not thread-safe virtual Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out); - std::mutex& lock() { return lock_; } - protected: - std::mutex lock_; - RandomAccessFile(); }; http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/memory.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.cc b/cpp/src/arrow/io/memory.cc index 0b91ab5..370d3e9 100644 --- a/cpp/src/arrow/io/memory.cc +++ b/cpp/src/arrow/io/memory.cc @@ -20,6 +20,7 @@ #include #include #include +#include #include "arrow/buffer.h" #include "arrow/status.h" @@ -127,67 +128,109 @@ static constexpr int kMemcopyDefaultNumThreads = 1; static constexpr int64_t kMemcopyDefaultBlocksize = 64; static constexpr int64_t kMemcopyDefaultThreshold = 1024 * 1024; -/// Input buffer must be mutable, will abort if not -FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr& buffer) - : memcopy_num_threads_(kMemcopyDefaultNumThreads), - memcopy_blocksize_(kMemcopyDefaultBlocksize), - memcopy_threshold_(kMemcopyDefaultThreshold) { - DCHECK(buffer) << "Buffer was nullptr"; - buffer_ = buffer; - DCHECK(buffer->is_mutable()) << "Must pass mutable buffer"; - mutable_data_ = buffer->mutable_data(); - size_ = buffer->size(); - position_ = 0; -} +class FixedSizeBufferWriter::FixedSizeBufferWriterImpl { + public: + /// Input buffer must be mutable, will abort if not + + /// Input buffer must be mutable, will abort if not + explicit FixedSizeBufferWriterImpl(const std::shared_ptr& buffer) + : memcopy_num_threads_(kMemcopyDefaultNumThreads), + memcopy_blocksize_(kMemcopyDefaultBlocksize), + memcopy_threshold_(kMemcopyDefaultThreshold) { + buffer_ = buffer; + DCHECK(buffer->is_mutable()) << "Must pass mutable buffer"; + mutable_data_ = buffer->mutable_data(); + size_ = buffer->size(); + position_ = 0; + } -FixedSizeBufferWriter::~FixedSizeBufferWriter() {} + ~FixedSizeBufferWriterImpl() {} -Status FixedSizeBufferWriter::Close() { - // no-op - return Status::OK(); -} + Status Close() { + // No-op + return Status::OK(); + } -Status FixedSizeBufferWriter::Seek(int64_t position) { - if (position < 0 || position >= size_) { - return Status::IOError("position out of bounds"); + Status Seek(int64_t position) { + if (position < 0 || position >= size_) { + return Status::IOError("position out of bounds"); + } + position_ = position; + return Status::OK(); } - position_ = position; - return Status::OK(); -} + + Status Tell(int64_t* position) { + *position = position_; + return Status::OK(); + } + + Status Write(const uint8_t* data, int64_t nbytes) { + if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) { + internal::parallel_memcopy(mutable_data_ + position_, data, nbytes, + memcopy_blocksize_, memcopy_num_threads_); + } else { + memcpy(mutable_data_ + position_, data, nbytes); + } + position_ += nbytes; + return Status::OK(); + } + + Status WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) { + std::lock_guard guard(lock_); + RETURN_NOT_OK(Seek(position)); + return Write(data, nbytes); + } + + void set_memcopy_threads(int num_threads) { memcopy_num_threads_ = num_threads; } + + void set_memcopy_blocksize(int64_t blocksize) { memcopy_blocksize_ = blocksize; } + + void set_memcopy_threshold(int64_t threshold) { memcopy_threshold_ = threshold; } + + private: + std::mutex lock_; + std::shared_ptr buffer_; + uint8_t* mutable_data_; + int64_t size_; + int64_t position_; + + int memcopy_num_threads_; + int64_t memcopy_blocksize_; + int64_t memcopy_threshold_; +}; + +FixedSizeBufferWriter::~FixedSizeBufferWriter() {} + +FixedSizeBufferWriter::FixedSizeBufferWriter(const std::shared_ptr& buffer) + : impl_(new FixedSizeBufferWriterImpl(buffer)) {} + +Status FixedSizeBufferWriter::Close() { return impl_->Close(); } + +Status FixedSizeBufferWriter::Seek(int64_t position) { return impl_->Seek(position); } Status FixedSizeBufferWriter::Tell(int64_t* position) const { - *position = position_; - return Status::OK(); + return impl_->Tell(position); } Status FixedSizeBufferWriter::Write(const uint8_t* data, int64_t nbytes) { - if (nbytes > memcopy_threshold_ && memcopy_num_threads_ > 1) { - internal::parallel_memcopy(mutable_data_ + position_, data, nbytes, - memcopy_blocksize_, memcopy_num_threads_); - } else { - memcpy(mutable_data_ + position_, data, nbytes); - } - position_ += nbytes; - return Status::OK(); + return impl_->Write(data, nbytes); } Status FixedSizeBufferWriter::WriteAt(int64_t position, const uint8_t* data, int64_t nbytes) { - std::lock_guard guard(lock_); - RETURN_NOT_OK(Seek(position)); - return Write(data, nbytes); + return impl_->WriteAt(position, data, nbytes); } void FixedSizeBufferWriter::set_memcopy_threads(int num_threads) { - memcopy_num_threads_ = num_threads; + impl_->set_memcopy_threads(num_threads); } void FixedSizeBufferWriter::set_memcopy_blocksize(int64_t blocksize) { - memcopy_blocksize_ = blocksize; + impl_->set_memcopy_blocksize(blocksize); } void FixedSizeBufferWriter::set_memcopy_threshold(int64_t threshold) { - memcopy_threshold_ = threshold; + impl_->set_memcopy_threshold(threshold); } // ---------------------------------------------------------------------- @@ -233,6 +276,18 @@ Status BufferReader::Read(int64_t nbytes, std::shared_ptr* out) { return Status::OK(); } +Status BufferReader::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* out) { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, out); +} + +Status BufferReader::ReadAt(int64_t position, int64_t nbytes, + std::shared_ptr* out) { + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, out); +} + Status BufferReader::GetSize(int64_t* size) { *size = size_; return Status::OK(); http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/io/memory.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/io/memory.h b/cpp/src/arrow/io/memory.h index 563000f..978c198 100644 --- a/cpp/src/arrow/io/memory.h +++ b/cpp/src/arrow/io/memory.h @@ -22,7 +22,6 @@ #include #include -#include #include "arrow/io/interfaces.h" #include "arrow/util/visibility.h" @@ -99,15 +98,8 @@ class ARROW_EXPORT FixedSizeBufferWriter : public WriteableFile { void set_memcopy_threshold(int64_t threshold); protected: - std::mutex lock_; - std::shared_ptr buffer_; - uint8_t* mutable_data_; - int64_t size_; - int64_t position_; - - int memcopy_num_threads_; - int64_t memcopy_blocksize_; - int64_t memcopy_threshold_; + class FixedSizeBufferWriterImpl; + std::unique_ptr impl_; }; /// \class BufferReader @@ -125,6 +117,12 @@ class ARROW_EXPORT BufferReader : public RandomAccessFile { // Zero copy read Status Read(int64_t nbytes, std::shared_ptr* out) override; + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* out) override; + + /// Default implementation is thread-safe + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + Status GetSize(int64_t* size) override; Status Seek(int64_t position) override; http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/memory_pool-test.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool-test.cc b/cpp/src/arrow/memory_pool-test.cc index 552c79b..0a4785d 100644 --- a/cpp/src/arrow/memory_pool-test.cc +++ b/cpp/src/arrow/memory_pool-test.cc @@ -59,39 +59,36 @@ TEST(DefaultMemoryPoolDeathTest, FreeLargeMemory) { } TEST(DefaultMemoryPoolDeathTest, MaxMemory) { - DefaultMemoryPool pool; - - ASSERT_EQ(0, pool.max_memory()); + MemoryPool* pool = default_memory_pool(); uint8_t* data; - ASSERT_OK(pool.Allocate(100, &data)); + ASSERT_OK(pool->Allocate(100, &data)); uint8_t* data2; - ASSERT_OK(pool.Allocate(100, &data2)); + ASSERT_OK(pool->Allocate(100, &data2)); - pool.Free(data, 100); - pool.Free(data2, 100); + pool->Free(data, 100); + pool->Free(data2, 100); - ASSERT_EQ(200, pool.max_memory()); + ASSERT_EQ(200, pool->max_memory()); } #endif // ARROW_VALGRIND TEST(LoggingMemoryPool, Logging) { - DefaultMemoryPool pool; - LoggingMemoryPool lp(&pool); + MemoryPool* pool = default_memory_pool(); - ASSERT_EQ(0, lp.max_memory()); + LoggingMemoryPool lp(pool); uint8_t* data; - ASSERT_OK(pool.Allocate(100, &data)); + ASSERT_OK(pool->Allocate(100, &data)); uint8_t* data2; - ASSERT_OK(pool.Allocate(100, &data2)); + ASSERT_OK(pool->Allocate(100, &data2)); - pool.Free(data, 100); - pool.Free(data2, 100); + pool->Free(data, 100); + pool->Free(data2, 100); - ASSERT_EQ(200, pool.max_memory()); + ASSERT_EQ(200, pool->max_memory()); } } // namespace arrow http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/memory_pool.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool.cc b/cpp/src/arrow/memory_pool.cc index 851065b..3496636 100644 --- a/cpp/src/arrow/memory_pool.cc +++ b/cpp/src/arrow/memory_pool.cc @@ -85,74 +85,82 @@ MemoryPool::~MemoryPool() {} int64_t MemoryPool::max_memory() const { return -1; } -DefaultMemoryPool::DefaultMemoryPool() : bytes_allocated_(0) { max_memory_ = 0; } +class DefaultMemoryPool : public MemoryPool { + public: + DefaultMemoryPool() : bytes_allocated_(0) { max_memory_ = 0; } -Status DefaultMemoryPool::Allocate(int64_t size, uint8_t** out) { - RETURN_NOT_OK(AllocateAligned(size, out)); - bytes_allocated_ += size; + ~DefaultMemoryPool() {} - { - std::lock_guard guard(lock_); - if (bytes_allocated_ > max_memory_) { - max_memory_ = bytes_allocated_.load(); + Status Allocate(int64_t size, uint8_t** out) override { + RETURN_NOT_OK(AllocateAligned(size, out)); + bytes_allocated_ += size; + + { + std::lock_guard guard(lock_); + if (bytes_allocated_ > max_memory_) { + max_memory_ = bytes_allocated_.load(); + } } + return Status::OK(); } - return Status::OK(); -} -Status DefaultMemoryPool::Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) { + Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override { #ifdef ARROW_JEMALLOC - *ptr = reinterpret_cast(rallocx(*ptr, new_size, MALLOCX_ALIGN(kAlignment))); - if (*ptr == NULL) { - std::stringstream ss; - ss << "realloc of size " << new_size << " failed"; - return Status::OutOfMemory(ss.str()); - } + *ptr = reinterpret_cast(rallocx(*ptr, new_size, MALLOCX_ALIGN(kAlignment))); + if (*ptr == NULL) { + std::stringstream ss; + ss << "realloc of size " << new_size << " failed"; + return Status::OutOfMemory(ss.str()); + } #else - // Note: We cannot use realloc() here as it doesn't guarantee alignment. - - // Allocate new chunk - uint8_t* out = nullptr; - RETURN_NOT_OK(AllocateAligned(new_size, &out)); - DCHECK(out); - // Copy contents and release old memory chunk - memcpy(out, *ptr, static_cast(std::min(new_size, old_size))); + // Note: We cannot use realloc() here as it doesn't guarantee alignment. + + // Allocate new chunk + uint8_t* out = nullptr; + RETURN_NOT_OK(AllocateAligned(new_size, &out)); + DCHECK(out); + // Copy contents and release old memory chunk + memcpy(out, *ptr, static_cast(std::min(new_size, old_size))); #ifdef _MSC_VER - _aligned_free(*ptr); + _aligned_free(*ptr); #else - std::free(*ptr); + std::free(*ptr); #endif // defined(_MSC_VER) - *ptr = out; + *ptr = out; #endif // defined(ARROW_JEMALLOC) - bytes_allocated_ += new_size - old_size; - { - std::lock_guard guard(lock_); - if (bytes_allocated_ > max_memory_) { - max_memory_ = bytes_allocated_.load(); + bytes_allocated_ += new_size - old_size; + { + std::lock_guard guard(lock_); + if (bytes_allocated_ > max_memory_) { + max_memory_ = bytes_allocated_.load(); + } } - } - return Status::OK(); -} + return Status::OK(); + } -int64_t DefaultMemoryPool::bytes_allocated() const { return bytes_allocated_.load(); } + int64_t bytes_allocated() const override { return bytes_allocated_.load(); } -void DefaultMemoryPool::Free(uint8_t* buffer, int64_t size) { - DCHECK_GE(bytes_allocated_, size); + void Free(uint8_t* buffer, int64_t size) override { + DCHECK_GE(bytes_allocated_, size); #ifdef _MSC_VER - _aligned_free(buffer); + _aligned_free(buffer); #elif defined(ARROW_JEMALLOC) - dallocx(buffer, MALLOCX_ALIGN(kAlignment)); + dallocx(buffer, MALLOCX_ALIGN(kAlignment)); #else - std::free(buffer); + std::free(buffer); #endif - bytes_allocated_ -= size; -} + bytes_allocated_ -= size; + } -int64_t DefaultMemoryPool::max_memory() const { return max_memory_.load(); } + int64_t max_memory() const override { return max_memory_.load(); } -DefaultMemoryPool::~DefaultMemoryPool() {} + private: + mutable std::mutex lock_; + std::atomic bytes_allocated_; + std::atomic max_memory_; +}; MemoryPool* default_memory_pool() { static DefaultMemoryPool default_memory_pool_; http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/memory_pool.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/memory_pool.h b/cpp/src/arrow/memory_pool.h index 5bb2b56..52ec67f 100644 --- a/cpp/src/arrow/memory_pool.h +++ b/cpp/src/arrow/memory_pool.h @@ -20,7 +20,6 @@ #include #include -#include #include "arrow/util/visibility.h" @@ -69,26 +68,6 @@ class ARROW_EXPORT MemoryPool { MemoryPool(); }; -class ARROW_EXPORT DefaultMemoryPool : public MemoryPool { - public: - DefaultMemoryPool(); - virtual ~DefaultMemoryPool(); - - Status Allocate(int64_t size, uint8_t** out) override; - Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override; - - void Free(uint8_t* buffer, int64_t size) override; - - int64_t bytes_allocated() const override; - - int64_t max_memory() const override; - - private: - mutable std::mutex lock_; - std::atomic bytes_allocated_; - std::atomic max_memory_; -}; - class ARROW_EXPORT LoggingMemoryPool : public MemoryPool { public: explicit LoggingMemoryPool(MemoryPool* pool); http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/python/io.cc ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/io.cc b/cpp/src/arrow/python/io.cc index 99b99f7..b01358a 100644 --- a/cpp/src/arrow/python/io.cc +++ b/cpp/src/arrow/python/io.cc @@ -19,6 +19,7 @@ #include #include +#include #include #include "arrow/io/memory.h" @@ -33,10 +34,6 @@ namespace py { // ---------------------------------------------------------------------- // Python file -PythonFile::PythonFile(PyObject* file) : file_(file) { Py_INCREF(file_); } - -PythonFile::~PythonFile() { Py_DECREF(file_); } - // This is annoying: because C++11 does not allow implicit conversion of string // literals to non-const char*, we need to go through some gymnastics to use // PyObject_CallMethod without a lot of pain (its arguments are non-const @@ -48,53 +45,68 @@ static inline PyObject* cpp_PyObject_CallMethod(PyObject* obj, const char* metho const_cast(argspec), args...); } -Status PythonFile::Close() { - // whence: 0 for relative to start of file, 2 for end of file - PyObject* result = cpp_PyObject_CallMethod(file_, "close", "()"); - Py_XDECREF(result); - PY_RETURN_IF_ERROR(StatusCode::IOError); - return Status::OK(); -} - -Status PythonFile::Seek(int64_t position, int whence) { - // whence: 0 for relative to start of file, 2 for end of file - PyObject* result = cpp_PyObject_CallMethod(file_, "seek", "(ii)", position, whence); - Py_XDECREF(result); - PY_RETURN_IF_ERROR(StatusCode::IOError); - return Status::OK(); -} - -Status PythonFile::Read(int64_t nbytes, PyObject** out) { - PyObject* result = cpp_PyObject_CallMethod(file_, "read", "(i)", nbytes); - PY_RETURN_IF_ERROR(StatusCode::IOError); - *out = result; - return Status::OK(); -} - -Status PythonFile::Write(const uint8_t* data, int64_t nbytes) { - PyObject* py_data = - PyBytes_FromStringAndSize(reinterpret_cast(data), nbytes); - PY_RETURN_IF_ERROR(StatusCode::IOError); - - PyObject* result = cpp_PyObject_CallMethod(file_, "write", "(O)", py_data); - Py_XDECREF(py_data); - Py_XDECREF(result); - PY_RETURN_IF_ERROR(StatusCode::IOError); - return Status::OK(); -} - -Status PythonFile::Tell(int64_t* position) { - PyObject* result = cpp_PyObject_CallMethod(file_, "tell", "()"); - PY_RETURN_IF_ERROR(StatusCode::IOError); - - *position = PyLong_AsLongLong(result); - Py_DECREF(result); - - // PyLong_AsLongLong can raise OverflowError - PY_RETURN_IF_ERROR(StatusCode::IOError); - - return Status::OK(); -} +// A common interface to a Python file-like object. Must acquire GIL before +// calling any methods +class PythonFile { + public: + explicit PythonFile(PyObject* file) : file_(file) { Py_INCREF(file_); } + + ~PythonFile() { Py_DECREF(file_); } + + Status Close() { + // whence: 0 for relative to start of file, 2 for end of file + PyObject* result = cpp_PyObject_CallMethod(file_, "close", "()"); + Py_XDECREF(result); + PY_RETURN_IF_ERROR(StatusCode::IOError); + return Status::OK(); + } + + Status Seek(int64_t position, int whence) { + // whence: 0 for relative to start of file, 2 for end of file + PyObject* result = cpp_PyObject_CallMethod(file_, "seek", "(ii)", position, whence); + Py_XDECREF(result); + PY_RETURN_IF_ERROR(StatusCode::IOError); + return Status::OK(); + } + + Status Read(int64_t nbytes, PyObject** out) { + PyObject* result = cpp_PyObject_CallMethod(file_, "read", "(i)", nbytes); + PY_RETURN_IF_ERROR(StatusCode::IOError); + *out = result; + return Status::OK(); + } + + Status Write(const uint8_t* data, int64_t nbytes) { + PyObject* py_data = + PyBytes_FromStringAndSize(reinterpret_cast(data), nbytes); + PY_RETURN_IF_ERROR(StatusCode::IOError); + + PyObject* result = cpp_PyObject_CallMethod(file_, "write", "(O)", py_data); + Py_XDECREF(py_data); + Py_XDECREF(result); + PY_RETURN_IF_ERROR(StatusCode::IOError); + return Status::OK(); + } + + Status Tell(int64_t* position) { + PyObject* result = cpp_PyObject_CallMethod(file_, "tell", "()"); + PY_RETURN_IF_ERROR(StatusCode::IOError); + + *position = PyLong_AsLongLong(result); + Py_DECREF(result); + + // PyLong_AsLongLong can raise OverflowError + PY_RETURN_IF_ERROR(StatusCode::IOError); + + return Status::OK(); + } + + std::mutex& lock() { return lock_; } + + private: + std::mutex lock_; + PyObject* file_; +}; // ---------------------------------------------------------------------- // Seekable input stream @@ -142,6 +154,20 @@ Status PyReadableFile::Read(int64_t nbytes, std::shared_ptr* out) { return Status::OK(); } +Status PyReadableFile::ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* out) { + std::lock_guard guard(file_->lock()); + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, bytes_read, out); +} + +Status PyReadableFile::ReadAt(int64_t position, int64_t nbytes, + std::shared_ptr* out) { + std::lock_guard guard(file_->lock()); + RETURN_NOT_OK(Seek(position)); + return Read(nbytes, out); +} + Status PyReadableFile::GetSize(int64_t* size) { PyAcquireGIL lock; http://git-wip-us.apache.org/repos/asf/arrow/blob/b29b0658/cpp/src/arrow/python/io.h ---------------------------------------------------------------------- diff --git a/cpp/src/arrow/python/io.h b/cpp/src/arrow/python/io.h index eda7346..bf5db53 100644 --- a/cpp/src/arrow/python/io.h +++ b/cpp/src/arrow/python/io.h @@ -32,22 +32,7 @@ class MemoryPool; namespace py { -// A common interface to a Python file-like object. Must acquire GIL before -// calling any methods -class ARROW_EXPORT PythonFile { - public: - explicit PythonFile(PyObject* file); - ~PythonFile(); - - Status Close(); - Status Seek(int64_t position, int whence); - Status Read(int64_t nbytes, PyObject** out); - Status Tell(int64_t* position); - Status Write(const uint8_t* data, int64_t nbytes); - - private: - PyObject* file_; -}; +class ARROW_NO_EXPORT PythonFile; class ARROW_EXPORT PyReadableFile : public io::RandomAccessFile { public: @@ -59,6 +44,13 @@ class ARROW_EXPORT PyReadableFile : public io::RandomAccessFile { Status Read(int64_t nbytes, int64_t* bytes_read, uint8_t* out) override; Status Read(int64_t nbytes, std::shared_ptr* out) override; + // Thread-safe version + Status ReadAt(int64_t position, int64_t nbytes, int64_t* bytes_read, + uint8_t* out) override; + + // Thread-safe version + Status ReadAt(int64_t position, int64_t nbytes, std::shared_ptr* out) override; + Status GetSize(int64_t* size) override; Status Seek(int64_t position) override;