Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id BDEC8200C05 for ; Mon, 23 Jan 2017 08:55:47 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id BCA1A160B3E; Mon, 23 Jan 2017 07:55:47 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id C2B2E160B49 for ; Mon, 23 Jan 2017 08:55:46 +0100 (CET) Received: (qmail 42336 invoked by uid 500); 23 Jan 2017 07:55:46 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 42324 invoked by uid 99); 23 Jan 2017 07:55:46 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jan 2017 07:55:45 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A8D06DFBE7; Mon, 23 Jan 2017 07:55:45 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uwe@apache.org To: commits@parquet.apache.org Message-Id: <74200c2b716b4d97a6234f32e9f955e2@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: parquet-cpp git commit: PARQUET-837: Remove RandomAccessSource::Seek method which can be a source of thread safety problems Date: Mon, 23 Jan 2017 07:55:45 +0000 (UTC) archived-at: Mon, 23 Jan 2017 07:55:47 -0000 Repository: parquet-cpp Updated Branches: refs/heads/master 8f0a51d42 -> fecdcbf69 PARQUET-837: Remove RandomAccessSource::Seek method which can be a source of thread safety problems We were only using `Seek` in a couple places. This will eliminate a source of difficult-to-reproduce race conditions Author: Wes McKinney Closes #221 from wesm/PARQUET-837 and squashes the following commits: f658549 [Wes McKinney] Remove RandomAccessSource::Seek method which can be a source of thread-safety issues Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/fecdcbf6 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/fecdcbf6 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/fecdcbf6 Branch: refs/heads/master Commit: fecdcbf69c55223958b775afb217e7674ca4991e Parents: 8f0a51d Author: Wes McKinney Authored: Mon Jan 23 08:55:31 2017 +0100 Committer: Uwe L. Korn Committed: Mon Jan 23 08:55:31 2017 +0100 ---------------------------------------------------------------------- src/parquet/arrow/arrow-reader-writer-test.cc | 7 ++++--- src/parquet/column/column-writer-test.cc | 3 ++- src/parquet/column/scanner-test.cc | 3 ++- src/parquet/encodings/encoding-test.cc | 3 ++- src/parquet/file/file-serialize-test.cc | 3 ++- src/parquet/file/reader-internal.cc | 9 +++++---- src/parquet/thrift/util.h | 6 ++++-- src/parquet/util/memory-test.cc | 8 +------- src/parquet/util/memory.cc | 14 ++++++++------ src/parquet/util/memory.h | 13 +++++++++---- src/parquet/util/test-common.h | 3 ++- 11 files changed, 41 insertions(+), 31 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 0418d42..57986de 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -268,7 +268,8 @@ class TestParquetIO : public ::testing::Test { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType, - ::arrow::StringType, ::arrow::BinaryType> TestTypes; + ::arrow::StringType, ::arrow::BinaryType> + TestTypes; TYPED_TEST_CASE(TestParquetIO, TestTypes); @@ -620,8 +621,8 @@ class TestPrimitiveParquetIO : public TestParquetIO { typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type, ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type, - ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, - ::arrow::DoubleType> PrimitiveTestTypes; + ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, ::arrow::DoubleType> + PrimitiveTestTypes; TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/column/column-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc index 446811e..fc944ca 100644 --- a/src/parquet/column/column-writer-test.cc +++ b/src/parquet/column/column-writer-test.cc @@ -244,7 +244,8 @@ void TestPrimitiveWriter::ReadColumnFully(Compression::type compressio } typedef ::testing::Types TestTypes; + BooleanType, ByteArrayType, FLBAType> + TestTypes; TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/column/scanner-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc index 3ac07dc..8eee191 100644 --- a/src/parquet/column/scanner-test.cc +++ b/src/parquet/column/scanner-test.cc @@ -146,7 +146,8 @@ static int num_pages = 20; static int batch_size = 32; typedef ::testing::Types TestTypes; + ByteArrayType> + TestTypes; using TestBooleanFlatScanner = TestFlatScanner; using TestFLBAFlatScanner = TestFlatScanner; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/encodings/encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc index 914904e..a1aa448 100644 --- a/src/parquet/encodings/encoding-test.cc +++ b/src/parquet/encodings/encoding-test.cc @@ -237,7 +237,8 @@ TYPED_TEST(TestPlainEncoding, BasicRoundTrip) { // Dictionary encoding tests typedef ::testing::Types DictEncodedTypes; + ByteArrayType, FLBAType> + DictEncodedTypes; template class TestDictionaryEncoding : public TestEncodingBase { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/file/file-serialize-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc index 71dd5c4..7a90eeb 100644 --- a/src/parquet/file/file-serialize-test.cc +++ b/src/parquet/file/file-serialize-test.cc @@ -106,7 +106,8 @@ class TestSerialize : public PrimitiveTypedTest { }; typedef ::testing::Types TestTypes; + BooleanType, ByteArrayType, FLBAType> + TestTypes; TYPED_TEST_CASE(TestSerialize, TestTypes); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/file/reader-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc index 9608f58..d6fcc48 100644 --- a/src/parquet/file/reader-internal.cc +++ b/src/parquet/file/reader-internal.cc @@ -254,8 +254,9 @@ void SerializedFile::ParseMetaData() { } uint8_t footer_buffer[FOOTER_SIZE]; - source_->Seek(file_size - FOOTER_SIZE); - int64_t bytes_read = source_->Read(FOOTER_SIZE, footer_buffer); + int64_t bytes_read = + source_->ReadAt(file_size - FOOTER_SIZE, FOOTER_SIZE, footer_buffer); + if (bytes_read != FOOTER_SIZE || memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) { throw ParquetException("Invalid parquet file. Corrupt footer."); } @@ -267,11 +268,11 @@ void SerializedFile::ParseMetaData() { "Invalid parquet file. File is less than " "file metadata size."); } - source_->Seek(metadata_start); std::shared_ptr metadata_buffer = AllocateBuffer(properties_.allocator(), metadata_len); - bytes_read = source_->Read(metadata_len, metadata_buffer->mutable_data()); + bytes_read = + source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data()); if (bytes_read != metadata_len) { throw ParquetException("Invalid parquet file. Could not read metadata bytes."); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/thrift/util.h ---------------------------------------------------------------------- diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h index 92ea82c..9d2b66f 100644 --- a/src/parquet/thrift/util.h +++ b/src/parquet/thrift/util.h @@ -99,7 +99,8 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali boost::shared_ptr tmem_transport( new apache::thrift::transport::TMemoryBuffer(const_cast(buf), *len)); apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> tproto_factory; + apache::thrift::transport::TMemoryBuffer> + tproto_factory; boost::shared_ptr tproto = tproto_factory.getProtocol(tmem_transport); try { @@ -121,7 +122,8 @@ inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) { boost::shared_ptr mem_buffer( new apache::thrift::transport::TMemoryBuffer(len)); apache::thrift::protocol::TCompactProtocolFactoryT< - apache::thrift::transport::TMemoryBuffer> tproto_factory; + apache::thrift::transport::TMemoryBuffer> + tproto_factory; boost::shared_ptr tproto = tproto_factory.getProtocol(mem_buffer); try { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/util/memory-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc index 45aa819..5225261 100644 --- a/src/parquet/util/memory-test.cc +++ b/src/parquet/util/memory-test.cc @@ -360,16 +360,10 @@ TEST(TestArrowInputFile, Basics) { auto source = std::make_shared(file); ASSERT_EQ(0, source->Tell()); - ASSERT_NO_THROW(source->Seek(5)); - ASSERT_EQ(5, source->Tell()); - ASSERT_NO_THROW(source->Seek(0)); - - // Seek out of bounds - ASSERT_THROW(source->Seek(100), ParquetException); uint8_t buffer[50]; - ASSERT_NO_THROW(source->Read(4, buffer)); + ASSERT_NO_THROW(source->ReadAt(0, 4, buffer)); ASSERT_EQ(0, std::memcmp(buffer, "this", 4)); ASSERT_EQ(4, source->Tell()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/util/memory.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc index 72ed3ac..744dff9 100644 --- a/src/parquet/util/memory.cc +++ b/src/parquet/util/memory.cc @@ -378,10 +378,6 @@ int64_t ArrowInputFile::Size() const { return size; } -void ArrowInputFile::Seek(int64_t position) { - PARQUET_THROW_NOT_OK(file_->Seek(position)); -} - // Returns bytes read int64_t ArrowInputFile::Read(int64_t nbytes, uint8_t* out) { int64_t bytes_read = 0; @@ -401,6 +397,12 @@ std::shared_ptr ArrowInputFile::ReadAt(int64_t position, int64_t nbytes) return out; } +int64_t ArrowInputFile::ReadAt(int64_t position, int64_t nbytes, uint8_t* out) { + int64_t bytes_read = 0; + PARQUET_THROW_NOT_OK(file_->ReadAt(position, nbytes, &bytes_read, out)); + return bytes_read; +} + ArrowOutputStream::ArrowOutputStream( const std::shared_ptr<::arrow::io::OutputStream> file) : file_(file) {} @@ -509,9 +511,9 @@ const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes } // Read more data when buffer has insufficient left or when resized if (*num_bytes > (buffer_size_ - buffer_offset_)) { - source_->Seek(stream_offset_); buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_); - int64_t bytes_read = source_->Read(buffer_size_, buffer_->mutable_data()); + int64_t bytes_read = + source_->ReadAt(stream_offset_, buffer_size_, buffer_->mutable_data()); if (bytes_read < *num_bytes) { throw ParquetException("Failed reading column data from source"); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/util/memory.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h index 2a71c37..e82df95 100644 --- a/src/parquet/util/memory.h +++ b/src/parquet/util/memory.h @@ -260,20 +260,23 @@ class PARQUET_EXPORT FileInterface { virtual int64_t Tell() = 0; }; +/// It is the responsibility of implementations to mind threadsafety of shared +/// resources class PARQUET_EXPORT RandomAccessSource : virtual public FileInterface { public: virtual ~RandomAccessSource() {} virtual int64_t Size() const = 0; - virtual void Seek(int64_t position) = 0; - // Returns bytes read virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0; virtual std::shared_ptr Read(int64_t nbytes) = 0; virtual std::shared_ptr ReadAt(int64_t position, int64_t nbytes) = 0; + + /// Returns bytes read + virtual int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) = 0; }; class PARQUET_EXPORT OutputStream : virtual public FileInterface { @@ -295,6 +298,7 @@ class PARQUET_EXPORT ArrowFileMethods : virtual public FileInterface { virtual ::arrow::io::FileInterface* file_interface() = 0; }; +/// This interface depends on the threadsafety of the underlying Arrow file interface class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAccessSource { public: explicit ArrowInputFile( @@ -302,8 +306,6 @@ class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAcce int64_t Size() const override; - void Seek(int64_t position) override; - // Returns bytes read int64_t Read(int64_t nbytes, uint8_t* out) override; @@ -311,6 +313,9 @@ class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAcce std::shared_ptr ReadAt(int64_t position, int64_t nbytes) override; + /// Returns bytes read + int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) override; + std::shared_ptr<::arrow::io::ReadableFileInterface> file() const { return file_; } // Diamond inheritance http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/util/test-common.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h index edadb53..2327aeb 100644 --- a/src/parquet/util/test-common.h +++ b/src/parquet/util/test-common.h @@ -32,7 +32,8 @@ namespace parquet { namespace test { typedef ::testing::Types ParquetTypes; + DoubleType, ByteArrayType, FLBAType> + ParquetTypes; template static inline void assert_vector_equal(const vector& left, const vector& right) {