parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject parquet-cpp git commit: PARQUET-837: Remove RandomAccessSource::Seek method which can be a source of thread safety problems
Date Mon, 23 Jan 2017 07:55:45 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 8f0a51d42 -> fecdcbf69


PARQUET-837: Remove RandomAccessSource::Seek method which can be a source of thread safety
problems

We were only using `Seek` in a couple places. This will eliminate a source of difficult-to-reproduce
race conditions

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #221 from wesm/PARQUET-837 and squashes the following commits:

f658549 [Wes McKinney] Remove RandomAccessSource::Seek method which can be a source of thread-safety
issues


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/fecdcbf6
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/fecdcbf6
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/fecdcbf6

Branch: refs/heads/master
Commit: fecdcbf69c55223958b775afb217e7674ca4991e
Parents: 8f0a51d
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Mon Jan 23 08:55:31 2017 +0100
Committer: Uwe L. Korn <uwelk@xhochy.com>
Committed: Mon Jan 23 08:55:31 2017 +0100

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc |  7 ++++---
 src/parquet/column/column-writer-test.cc      |  3 ++-
 src/parquet/column/scanner-test.cc            |  3 ++-
 src/parquet/encodings/encoding-test.cc        |  3 ++-
 src/parquet/file/file-serialize-test.cc       |  3 ++-
 src/parquet/file/reader-internal.cc           |  9 +++++----
 src/parquet/thrift/util.h                     |  6 ++++--
 src/parquet/util/memory-test.cc               |  8 +-------
 src/parquet/util/memory.cc                    | 14 ++++++++------
 src/parquet/util/memory.h                     | 13 +++++++++----
 src/parquet/util/test-common.h                |  3 ++-
 11 files changed, 41 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 0418d42..57986de 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -268,7 +268,8 @@ class TestParquetIO : public ::testing::Test {
 typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
     ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
     ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
-    ::arrow::StringType, ::arrow::BinaryType> TestTypes;
+    ::arrow::StringType, ::arrow::BinaryType>
+    TestTypes;
 
 TYPED_TEST_CASE(TestParquetIO, TestTypes);
 
@@ -620,8 +621,8 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
 
 typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
     ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type,
-    ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType,
-    ::arrow::DoubleType> PrimitiveTestTypes;
+    ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, ::arrow::DoubleType>
+    PrimitiveTestTypes;
 
 TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index 446811e..fc944ca 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -244,7 +244,8 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type
compressio
 }
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    BooleanType, ByteArrayType, FLBAType> TestTypes;
+    BooleanType, ByteArrayType, FLBAType>
+    TestTypes;
 
 TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/column/scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc
index 3ac07dc..8eee191 100644
--- a/src/parquet/column/scanner-test.cc
+++ b/src/parquet/column/scanner-test.cc
@@ -146,7 +146,8 @@ static int num_pages = 20;
 static int batch_size = 32;
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    ByteArrayType> TestTypes;
+    ByteArrayType>
+    TestTypes;
 
 using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
 using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
index 914904e..a1aa448 100644
--- a/src/parquet/encodings/encoding-test.cc
+++ b/src/parquet/encodings/encoding-test.cc
@@ -237,7 +237,8 @@ TYPED_TEST(TestPlainEncoding, BasicRoundTrip) {
 // Dictionary encoding tests
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    ByteArrayType, FLBAType> DictEncodedTypes;
+    ByteArrayType, FLBAType>
+    DictEncodedTypes;
 
 template <typename Type>
 class TestDictionaryEncoding : public TestEncodingBase<Type> {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 71dd5c4..7a90eeb 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -106,7 +106,8 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
 };
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    BooleanType, ByteArrayType, FLBAType> TestTypes;
+    BooleanType, ByteArrayType, FLBAType>
+    TestTypes;
 
 TYPED_TEST_CASE(TestSerialize, TestTypes);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 9608f58..d6fcc48 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -254,8 +254,9 @@ void SerializedFile::ParseMetaData() {
   }
 
   uint8_t footer_buffer[FOOTER_SIZE];
-  source_->Seek(file_size - FOOTER_SIZE);
-  int64_t bytes_read = source_->Read(FOOTER_SIZE, footer_buffer);
+  int64_t bytes_read =
+      source_->ReadAt(file_size - FOOTER_SIZE, FOOTER_SIZE, footer_buffer);
+
   if (bytes_read != FOOTER_SIZE || memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
     throw ParquetException("Invalid parquet file. Corrupt footer.");
   }
@@ -267,11 +268,11 @@ void SerializedFile::ParseMetaData() {
         "Invalid parquet file. File is less than "
         "file metadata size.");
   }
-  source_->Seek(metadata_start);
 
   std::shared_ptr<PoolBuffer> metadata_buffer =
       AllocateBuffer(properties_.allocator(), metadata_len);
-  bytes_read = source_->Read(metadata_len, metadata_buffer->mutable_data());
+  bytes_read =
+      source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data());
   if (bytes_read != metadata_len) {
     throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/thrift/util.h
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h
index 92ea82c..9d2b66f 100644
--- a/src/parquet/thrift/util.h
+++ b/src/parquet/thrift/util.h
@@ -99,7 +99,8 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali
   boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
       new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
   apache::thrift::protocol::TCompactProtocolFactoryT<
-      apache::thrift::transport::TMemoryBuffer> tproto_factory;
+      apache::thrift::transport::TMemoryBuffer>
+      tproto_factory;
   boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
       tproto_factory.getProtocol(tmem_transport);
   try {
@@ -121,7 +122,8 @@ inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out)
{
   boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
       new apache::thrift::transport::TMemoryBuffer(len));
   apache::thrift::protocol::TCompactProtocolFactoryT<
-      apache::thrift::transport::TMemoryBuffer> tproto_factory;
+      apache::thrift::transport::TMemoryBuffer>
+      tproto_factory;
   boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
       tproto_factory.getProtocol(mem_buffer);
   try {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/util/memory-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc
index 45aa819..5225261 100644
--- a/src/parquet/util/memory-test.cc
+++ b/src/parquet/util/memory-test.cc
@@ -360,16 +360,10 @@ TEST(TestArrowInputFile, Basics) {
   auto source = std::make_shared<ArrowInputFile>(file);
 
   ASSERT_EQ(0, source->Tell());
-  ASSERT_NO_THROW(source->Seek(5));
-  ASSERT_EQ(5, source->Tell());
-  ASSERT_NO_THROW(source->Seek(0));
-
-  // Seek out of bounds
-  ASSERT_THROW(source->Seek(100), ParquetException);
 
   uint8_t buffer[50];
 
-  ASSERT_NO_THROW(source->Read(4, buffer));
+  ASSERT_NO_THROW(source->ReadAt(0, 4, buffer));
   ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
   ASSERT_EQ(4, source->Tell());
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/util/memory.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
index 72ed3ac..744dff9 100644
--- a/src/parquet/util/memory.cc
+++ b/src/parquet/util/memory.cc
@@ -378,10 +378,6 @@ int64_t ArrowInputFile::Size() const {
   return size;
 }
 
-void ArrowInputFile::Seek(int64_t position) {
-  PARQUET_THROW_NOT_OK(file_->Seek(position));
-}
-
 // Returns bytes read
 int64_t ArrowInputFile::Read(int64_t nbytes, uint8_t* out) {
   int64_t bytes_read = 0;
@@ -401,6 +397,12 @@ std::shared_ptr<Buffer> ArrowInputFile::ReadAt(int64_t position,
int64_t nbytes)
   return out;
 }
 
+int64_t ArrowInputFile::ReadAt(int64_t position, int64_t nbytes, uint8_t* out) {
+  int64_t bytes_read = 0;
+  PARQUET_THROW_NOT_OK(file_->ReadAt(position, nbytes, &bytes_read, out));
+  return bytes_read;
+}
+
 ArrowOutputStream::ArrowOutputStream(
     const std::shared_ptr<::arrow::io::OutputStream> file)
     : file_(file) {}
@@ -509,9 +511,9 @@ const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t*
num_bytes
   }
   // Read more data when buffer has insufficient left or when resized
   if (*num_bytes > (buffer_size_ - buffer_offset_)) {
-    source_->Seek(stream_offset_);
     buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_);
-    int64_t bytes_read = source_->Read(buffer_size_, buffer_->mutable_data());
+    int64_t bytes_read =
+        source_->ReadAt(stream_offset_, buffer_size_, buffer_->mutable_data());
     if (bytes_read < *num_bytes) {
       throw ParquetException("Failed reading column data from source");
     }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/util/memory.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 2a71c37..e82df95 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -260,20 +260,23 @@ class PARQUET_EXPORT FileInterface {
   virtual int64_t Tell() = 0;
 };
 
+/// It is the responsibility of implementations to mind threadsafety of shared
+/// resources
 class PARQUET_EXPORT RandomAccessSource : virtual public FileInterface {
  public:
   virtual ~RandomAccessSource() {}
 
   virtual int64_t Size() const = 0;
 
-  virtual void Seek(int64_t position) = 0;
-
   // Returns bytes read
   virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;
 
   virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0;
 
   virtual std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) = 0;
+
+  /// Returns bytes read
+  virtual int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) = 0;
 };
 
 class PARQUET_EXPORT OutputStream : virtual public FileInterface {
@@ -295,6 +298,7 @@ class PARQUET_EXPORT ArrowFileMethods : virtual public FileInterface {
   virtual ::arrow::io::FileInterface* file_interface() = 0;
 };
 
+/// This interface depends on the threadsafety of the underlying Arrow file interface
 class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAccessSource
{
  public:
   explicit ArrowInputFile(
@@ -302,8 +306,6 @@ class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public
RandomAcce
 
   int64_t Size() const override;
 
-  void Seek(int64_t position) override;
-
   // Returns bytes read
   int64_t Read(int64_t nbytes, uint8_t* out) override;
 
@@ -311,6 +313,9 @@ class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public
RandomAcce
 
   std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) override;
 
+  /// Returns bytes read
+  int64_t ReadAt(int64_t position, int64_t nbytes, uint8_t* out) override;
+
   std::shared_ptr<::arrow::io::ReadableFileInterface> file() const { return file_;
}
 
   // Diamond inheritance

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/fecdcbf6/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
index edadb53..2327aeb 100644
--- a/src/parquet/util/test-common.h
+++ b/src/parquet/util/test-common.h
@@ -32,7 +32,8 @@ namespace parquet {
 namespace test {
 
 typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type, FloatType,
-    DoubleType, ByteArrayType, FLBAType> ParquetTypes;
+    DoubleType, ByteArrayType, FLBAType>
+    ParquetTypes;
 
 template <typename T>
 static inline void assert_vector_equal(const vector<T>& left, const vector<T>&
right) {


Mime
View raw message