parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [parquet-cpp] branch master updated: PARQUET-1350: [C++] Use abstract ResizableBuffer instead of concrete PoolBuffer
Date Mon, 23 Jul 2018 18:29:16 GMT
This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 049af6f  PARQUET-1350: [C++] Use abstract ResizableBuffer instead of concrete PoolBuffer
049af6f is described below

commit 049af6f57293661944ff9fc26d3688466ed86968
Author: Antoine Pitrou <antoine@python.org>
AuthorDate: Mon Jul 23 14:29:11 2018 -0400

    PARQUET-1350: [C++] Use abstract ResizableBuffer instead of concrete PoolBuffer
    
    Author: Antoine Pitrou <antoine@python.org>
    
    Closes #477 from pitrou/PARQUET-1350-pool-buffer and squashes the following commits:
    
    3af304e [Antoine Pitrou] Fix linting
    6ffc72d [Antoine Pitrou] PARQUET-1350: [C++] Use abstract ResizableBuffer instead of concrete
PoolBuffer
---
 src/parquet/arrow/arrow-reader-writer-test.cc |  7 +++----
 src/parquet/arrow/reader.cc                   | 29 +++++++++++++--------------
 src/parquet/arrow/record_reader.cc            | 28 +++++++++++++-------------
 src/parquet/arrow/record_reader.h             |  4 ++--
 src/parquet/arrow/test-util.h                 |  5 ++---
 src/parquet/arrow/writer.cc                   | 12 +++++------
 src/parquet/column-io-benchmark.cc            |  4 ++--
 src/parquet/column_reader-test.cc             |  2 +-
 src/parquet/column_reader.cc                  |  2 +-
 src/parquet/column_reader.h                   |  6 +++---
 src/parquet/column_scanner.h                  |  4 ++--
 src/parquet/column_writer.cc                  |  2 +-
 src/parquet/encoding-benchmark.cc             |  4 ++--
 src/parquet/encoding-internal.h               | 14 ++++++-------
 src/parquet/encoding-test.cc                  |  2 +-
 src/parquet/encoding.h                        | 11 +++++-----
 src/parquet/file_reader.cc                    |  2 +-
 src/parquet/statistics.h                      | 10 ++++-----
 src/parquet/test-util.h                       |  2 +-
 src/parquet/util/memory-test.cc               |  3 ++-
 src/parquet/util/memory.cc                    | 19 +++++-------------
 src/parquet/util/memory.h                     | 13 +++++-------
 22 files changed, 86 insertions(+), 99 deletions(-)

diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index e0ff7aa..1c2f322 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -53,7 +53,7 @@ using arrow::ChunkedArray;
 using arrow::Column;
 using arrow::DataType;
 using arrow::ListArray;
-using arrow::PoolBuffer;
+using arrow::ResizableBuffer;
 using arrow::PrimitiveArray;
 using arrow::Status;
 using arrow::Table;
@@ -791,7 +791,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO)
{
   ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, false);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  auto buffer = std::make_shared<::arrow::PoolBuffer>();
+  auto buffer = AllocateBuffer();
 
   {
     // BufferOutputStream closed on gc
@@ -946,8 +946,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
   ASSERT_OK_NO_THROW(
       WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties));
 
-  std::shared_ptr<PoolBuffer> int64_data =
-      std::make_shared<PoolBuffer>(::arrow::default_memory_pool());
+  std::shared_ptr<ResizableBuffer> int64_data = AllocateBuffer();
   {
     ASSERT_OK(int64_data->Resize(sizeof(int64_t) * values->length()));
     auto int64_data_ptr = reinterpret_cast<int64_t*>(int64_data->mutable_data());
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 1f933e6..be270a8 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -46,7 +46,7 @@ using arrow::Field;
 using arrow::Int32Array;
 using arrow::ListArray;
 using arrow::MemoryPool;
-using arrow::PoolBuffer;
+using arrow::ResizableBuffer;
 using arrow::Status;
 using arrow::StructArray;
 using arrow::Table;
@@ -303,8 +303,7 @@ class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl
{
                       int16_t struct_def_level, MemoryPool* pool, const Node* node)
       : children_(children),
         struct_def_level_(struct_def_level),
-        pool_(pool),
-        def_levels_buffer_(pool) {
+        pool_(pool) {
     InitField(node, children);
   }
 
@@ -318,7 +317,7 @@ class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl
{
   int16_t struct_def_level_;
   MemoryPool* pool_;
   std::shared_ptr<Field> field_;
-  PoolBuffer def_levels_buffer_;
+  std::shared_ptr<ResizableBuffer> def_levels_buffer_;
 
   Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
   void InitField(const Node* node,
@@ -849,7 +848,7 @@ struct TransferFunctor {
     std::copy(values, values + length, out_ptr);
 
     if (reader->nullable_values()) {
-      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
       *out = std::make_shared<ArrayType<ArrowType>>(type, length, data, is_valid,
                                                     reader->null_count());
     } else {
@@ -866,10 +865,10 @@ struct TransferFunctor<ArrowType, ParquetType,
                     const std::shared_ptr<::arrow::DataType>& type,
                     std::shared_ptr<Array>* out) {
     int64_t length = reader->values_written();
-    std::shared_ptr<PoolBuffer> values = reader->ReleaseValues();
+    std::shared_ptr<ResizableBuffer> values = reader->ReleaseValues();
 
     if (reader->nullable_values()) {
-      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
       *out = std::make_shared<ArrayType<ArrowType>>(type, length, values, is_valid,
                                                     reader->null_count());
     } else {
@@ -902,7 +901,7 @@ struct TransferFunctor<::arrow::BooleanType, BooleanType> {
     }
 
     if (reader->nullable_values()) {
-      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
       RETURN_NOT_OK(is_valid->Resize(BytesForBits(length), false));
       *out = std::make_shared<BooleanArray>(type, length, data, is_valid,
                                             reader->null_count());
@@ -930,7 +929,7 @@ struct TransferFunctor<::arrow::TimestampType, Int96Type> {
     }
 
     if (reader->nullable_values()) {
-      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
       *out = std::make_shared<TimestampArray>(type, length, data, is_valid,
                                               reader->null_count());
     } else {
@@ -958,7 +957,7 @@ struct TransferFunctor<::arrow::Date64Type, Int32Type> {
     }
 
     if (reader->nullable_values()) {
-      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
       *out = std::make_shared<::arrow::Date64Array>(type, length, data, is_valid,
                                                     reader->null_count());
     } else {
@@ -1197,7 +1196,7 @@ static Status DecimalIntegerTransfer(RecordReader* reader, MemoryPool*
pool,
   }
 
   if (reader->nullable_values()) {
-    std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+    std::shared_ptr<ResizableBuffer> is_valid = reader->ReleaseIsValid();
     *out = std::make_shared<::arrow::Decimal128Array>(type, length, data, is_valid,
                                                       reader->null_count());
   } else {
@@ -1385,10 +1384,10 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length)
{
   size_t child_length;
   RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
   auto size = child_length * sizeof(int16_t);
-  RETURN_NOT_OK(def_levels_buffer_.Resize(size));
+  RETURN_NOT_OK(AllocateResizableBuffer(pool_, size, &def_levels_buffer_));
   // Initialize with the minimal def level
-  std::memset(def_levels_buffer_.mutable_data(), -1, size);
-  auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+  std::memset(def_levels_buffer_->mutable_data(), -1, size);
+  auto result_levels = reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
 
   // When a struct is defined, all of its children def levels are at least at
   // nesting level, and def level equals nesting level.
@@ -1408,7 +1407,7 @@ Status StructImpl::GetDefLevels(const int16_t** data, size_t* length)
{
           std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
     }
   }
-  *data = reinterpret_cast<const int16_t*>(def_levels_buffer_.data());
+  *data = reinterpret_cast<const int16_t*>(def_levels_buffer_->data());
   *length = child_length;
   return Status::OK();
 }
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
index a3af5ac..b4d8766 100644
--- a/src/parquet/arrow/record_reader.cc
+++ b/src/parquet/arrow/record_reader.cc
@@ -68,10 +68,10 @@ class RecordReader::RecordReaderImpl {
         levels_position_(0),
         levels_capacity_(0) {
     nullable_values_ = internal::HasSpacedValues(descr);
-    values_ = std::make_shared<PoolBuffer>(pool);
-    valid_bits_ = std::make_shared<PoolBuffer>(pool);
-    def_levels_ = std::make_shared<PoolBuffer>(pool);
-    rep_levels_ = std::make_shared<PoolBuffer>(pool);
+    values_ = AllocateBuffer(pool);
+    valid_bits_ = AllocateBuffer(pool);
+    def_levels_ = AllocateBuffer(pool);
+    rep_levels_ = AllocateBuffer(pool);
 
     if (descr->physical_type() == Type::BYTE_ARRAY) {
       builder_.reset(new ::arrow::BinaryBuilder(pool));
@@ -121,15 +121,15 @@ class RecordReader::RecordReaderImpl {
 
   bool nullable_values() const { return nullable_values_; }
 
-  std::shared_ptr<PoolBuffer> ReleaseValues() {
+  std::shared_ptr<ResizableBuffer> ReleaseValues() {
     auto result = values_;
-    values_ = std::make_shared<PoolBuffer>(pool_);
+    values_ = AllocateBuffer(pool_);
     return result;
   }
 
-  std::shared_ptr<PoolBuffer> ReleaseIsValid() {
+  std::shared_ptr<ResizableBuffer> ReleaseIsValid() {
     auto result = valid_bits_;
-    valid_bits_ = std::make_shared<PoolBuffer>(pool_);
+    valid_bits_ = AllocateBuffer(pool_);
     return result;
   }
 
@@ -328,16 +328,16 @@ class RecordReader::RecordReaderImpl {
   // TODO(wesm): ByteArray / FixedLenByteArray types
   std::unique_ptr<::arrow::ArrayBuilder> builder_;
 
-  std::shared_ptr<::arrow::PoolBuffer> values_;
+  std::shared_ptr<::arrow::ResizableBuffer> values_;
 
   template <typename T>
   T* ValuesHead() {
     return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
   }
 
-  std::shared_ptr<::arrow::PoolBuffer> valid_bits_;
-  std::shared_ptr<::arrow::PoolBuffer> def_levels_;
-  std::shared_ptr<::arrow::PoolBuffer> rep_levels_;
+  std::shared_ptr<::arrow::ResizableBuffer> valid_bits_;
+  std::shared_ptr<::arrow::ResizableBuffer> def_levels_;
+  std::shared_ptr<::arrow::ResizableBuffer> rep_levels_;
 };
 
 // The minimum number of repetition/definition levels to decode at a time, for
@@ -775,11 +775,11 @@ const int16_t* RecordReader::rep_levels() const { return impl_->rep_levels();
}
 
 const uint8_t* RecordReader::values() const { return impl_->values(); }
 
-std::shared_ptr<PoolBuffer> RecordReader::ReleaseValues() {
+std::shared_ptr<ResizableBuffer> RecordReader::ReleaseValues() {
   return impl_->ReleaseValues();
 }
 
-std::shared_ptr<PoolBuffer> RecordReader::ReleaseIsValid() {
+std::shared_ptr<ResizableBuffer> RecordReader::ReleaseIsValid() {
   return impl_->ReleaseIsValid();
 }
 
diff --git a/src/parquet/arrow/record_reader.h b/src/parquet/arrow/record_reader.h
index 9ca8b68..f02bf05 100644
--- a/src/parquet/arrow/record_reader.h
+++ b/src/parquet/arrow/record_reader.h
@@ -73,8 +73,8 @@ class RecordReader {
   /// result of calling ReadRecords
   void Reset();
 
-  std::shared_ptr<PoolBuffer> ReleaseValues();
-  std::shared_ptr<PoolBuffer> ReleaseIsValid();
+  std::shared_ptr<ResizableBuffer> ReleaseValues();
+  std::shared_ptr<ResizableBuffer> ReleaseIsValid();
   ::arrow::ArrayBuilder* builder();
 
   /// \brief Number of values written including nulls (if any)
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index c70e0ef..4db98b7 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -363,12 +363,11 @@ Status MakeListArray(const std::shared_ptr<Array>& values,
int64_t size,
   int64_t non_null_entries = size - null_count - 1;
   int64_t length_per_entry = values->length() / non_null_entries;
 
-  auto offsets = std::make_shared<::arrow::PoolBuffer>(::arrow::default_memory_pool());
+  auto offsets = AllocateBuffer();
   RETURN_NOT_OK(offsets->Resize((size + 1) * sizeof(int32_t)));
   int32_t* offsets_ptr = reinterpret_cast<int32_t*>(offsets->mutable_data());
 
-  auto null_bitmap =
-      std::make_shared<::arrow::PoolBuffer>(::arrow::default_memory_pool());
+  auto null_bitmap = AllocateBuffer();
   int64_t bitmap_size = ::arrow::BitUtil::CeilByte(size) / 8;
   RETURN_NOT_OK(null_bitmap->Resize(bitmap_size));
   uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index f772738..f3ddda9 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -41,7 +41,7 @@ using arrow::Int16Builder;
 using arrow::ListArray;
 using arrow::MemoryPool;
 using arrow::NumericArray;
-using arrow::PoolBuffer;
+using arrow::ResizableBuffer;
 using arrow::PrimitiveArray;
 using arrow::Status;
 using arrow::Table;
@@ -109,7 +109,7 @@ class LevelBuilder {
 
   Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
                         int64_t* values_offset, int64_t* num_values, int64_t* num_levels,
-                        const std::shared_ptr<PoolBuffer>& def_levels_scratch,
+                        const std::shared_ptr<ResizableBuffer>& def_levels_scratch,
                         std::shared_ptr<Buffer>* def_levels_out,
                         std::shared_ptr<Buffer>* rep_levels_out,
                         std::shared_ptr<Array>* values_array) {
@@ -266,8 +266,8 @@ Status LevelBuilder::VisitInline(const Array& array) {
 struct ColumnWriterContext {
   ColumnWriterContext(MemoryPool* memory_pool, ArrowWriterProperties* properties)
       : memory_pool(memory_pool), properties(properties) {
-    this->data_buffer = std::make_shared<PoolBuffer>(memory_pool);
-    this->def_levels_buffer = std::make_shared<PoolBuffer>(memory_pool);
+    this->data_buffer = AllocateBuffer(memory_pool);
+    this->def_levels_buffer = AllocateBuffer(memory_pool);
   }
 
   template <typename T>
@@ -282,10 +282,10 @@ struct ColumnWriterContext {
 
   // Buffer used for storing the data of an array converted to the physical type
   // as expected by parquet-cpp.
-  std::shared_ptr<PoolBuffer> data_buffer;
+  std::shared_ptr<ResizableBuffer> data_buffer;
 
   // We use the shared ownership of this buffer
-  std::shared_ptr<PoolBuffer> def_levels_buffer;
+  std::shared_ptr<ResizableBuffer> def_levels_buffer;
 };
 
 Status GetLeafType(const ::arrow::DataType& type, ::arrow::Type::type* leaf_type) {
diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc
index ad625bd..a9a7530 100644
--- a/src/parquet/column-io-benchmark.cc
+++ b/src/parquet/column-io-benchmark.cc
@@ -188,7 +188,7 @@ static void BM_RleEncoding(::benchmark::State& state) {
   int16_t max_level = 1;
   int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
                                                  static_cast<int>(levels.size()));
-  auto buffer_rle = std::make_shared<PoolBuffer>();
+  auto buffer_rle = AllocateBuffer();
   PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size));
 
   while (state.KeepRunning()) {
@@ -212,7 +212,7 @@ static void BM_RleDecoding(::benchmark::State& state) {
   int16_t max_level = 1;
   int rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
                                              static_cast<int>(levels.size()));
-  auto buffer_rle = std::make_shared<PoolBuffer>();
+  auto buffer_rle = AllocateBuffer();
   PARQUET_THROW_NOT_OK(buffer_rle->Resize(rle_size + sizeof(int32_t)));
   level_encoder.Init(Encoding::RLE, max_level, static_cast<int>(levels.size()),
                      buffer_rle->mutable_data() + sizeof(int32_t), rle_size);
diff --git a/src/parquet/column_reader-test.cc b/src/parquet/column_reader-test.cc
index 15ddc8b..273b302 100644
--- a/src/parquet/column_reader-test.cc
+++ b/src/parquet/column_reader-test.cc
@@ -328,7 +328,7 @@ TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
   max_rep_level_ = 0;
   NodePtr type = schema::Int32("a", Repetition::REQUIRED);
   const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
-  shared_ptr<PoolBuffer> dummy = std::make_shared<PoolBuffer>();
+  shared_ptr<ResizableBuffer> dummy = AllocateBuffer();
 
   shared_ptr<DictionaryPage> dict_page =
       std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index bcbb339..bc3ee8a 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -124,7 +124,7 @@ class SerializedPageReader : public PageReader {
 
   // Compression codec to use.
   std::unique_ptr<::arrow::Codec> decompressor_;
-  std::shared_ptr<PoolBuffer> decompression_buffer_;
+  std::shared_ptr<ResizableBuffer> decompression_buffer_;
 
   // Maximum allowed page size
   uint32_t max_page_header_size_;
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index 0d5f6ec..7134632 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -488,12 +488,12 @@ int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip)
{
       int64_t batch_size = 1024;  // ReadBatch with a smaller memory footprint
       int64_t values_read = 0;
 
-      std::shared_ptr<PoolBuffer> vals = AllocateBuffer(
+      std::shared_ptr<ResizableBuffer> vals = AllocateBuffer(
           this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
-      std::shared_ptr<PoolBuffer> def_levels =
+      std::shared_ptr<ResizableBuffer> def_levels =
           AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
 
-      std::shared_ptr<PoolBuffer> rep_levels =
+      std::shared_ptr<ResizableBuffer> rep_levels =
           AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
 
       do {
diff --git a/src/parquet/column_scanner.h b/src/parquet/column_scanner.h
index 2917201..0a866ee 100644
--- a/src/parquet/column_scanner.h
+++ b/src/parquet/column_scanner.h
@@ -44,7 +44,7 @@ class PARQUET_EXPORT Scanner {
       : batch_size_(batch_size),
         level_offset_(0),
         levels_buffered_(0),
-        value_buffer_(std::make_shared<PoolBuffer>(pool)),
+        value_buffer_(AllocateBuffer(pool)),
         value_offset_(0),
         values_buffered_(0),
         reader_(reader) {
@@ -77,7 +77,7 @@ class PARQUET_EXPORT Scanner {
   int level_offset_;
   int levels_buffered_;
 
-  std::shared_ptr<PoolBuffer> value_buffer_;
+  std::shared_ptr<ResizableBuffer> value_buffer_;
   int value_offset_;
   int64_t values_buffered_;
 
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 8a1b56c..b3ff8c3 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -510,7 +510,7 @@ void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
 template <typename Type>
 void TypedColumnWriter<Type>::WriteDictionaryPage() {
   auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
-  std::shared_ptr<PoolBuffer> buffer =
+  std::shared_ptr<ResizableBuffer> buffer =
       AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
   dict_encoder->WriteDict(buffer->mutable_data());
   // TODO Get rid of this deep call
diff --git a/src/parquet/encoding-benchmark.cc b/src/parquet/encoding-benchmark.cc
index ca12c6a..5ea8f8f 100644
--- a/src/parquet/encoding-benchmark.cc
+++ b/src/parquet/encoding-benchmark.cc
@@ -113,10 +113,10 @@ static void DecodeDict(std::vector<typename Type::c_type>&
values,
     encoder.Put(values[i]);
   }
 
-  std::shared_ptr<PoolBuffer> dict_buffer =
+  std::shared_ptr<ResizableBuffer> dict_buffer =
       AllocateBuffer(allocator, encoder.dict_encoded_size());
 
-  std::shared_ptr<PoolBuffer> indices =
+  std::shared_ptr<ResizableBuffer> indices =
       AllocateBuffer(allocator, encoder.EstimatedDataEncodedSize());
 
   encoder.WriteDict(dict_buffer->mutable_data());
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index e22edd0..98f9e4a 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -271,7 +271,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType>
{
  protected:
   int bits_available_;
   std::unique_ptr<::arrow::BitWriter> bit_writer_;
-  std::shared_ptr<PoolBuffer> bits_buffer_;
+  std::shared_ptr<ResizableBuffer> bits_buffer_;
   std::unique_ptr<InMemoryOutputStream> values_sink_;
 };
 
@@ -370,7 +370,7 @@ class DictionaryDecoder : public Decoder<Type> {
 
   // Data that contains the byte array data (byte_array_dictionary_ just has the
   // pointers).
-  std::shared_ptr<PoolBuffer> byte_array_data_;
+  std::shared_ptr<ResizableBuffer> byte_array_data_;
 
   ::arrow::RleDecoder idx_decoder_;
 };
@@ -514,7 +514,7 @@ class DictEncoder : public Encoder<DType> {
   void Put(const T& value);
 
   std::shared_ptr<Buffer> FlushValues() override {
-    std::shared_ptr<PoolBuffer> buffer =
+    std::shared_ptr<ResizableBuffer> buffer =
         AllocateBuffer(this->allocator_, EstimatedDataEncodedSize());
     int result_size = WriteIndices(buffer->mutable_data(),
                                    static_cast<int>(EstimatedDataEncodedSize()));
@@ -784,8 +784,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
 
   explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
                                ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
-      : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED),
-        delta_bit_widths_(new PoolBuffer(pool)) {
+      : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED), pool_(pool) {
     if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
       throw ParquetException("Delta bit pack encoding should only be for integer data.");
     }
@@ -813,8 +812,8 @@ class DeltaBitPackDecoder : public Decoder<DType> {
       ParquetException::EofException();
     }
     if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
-    PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_, false));
 
+    delta_bit_widths_ = AllocateBuffer(pool_, num_mini_blocks_);
     uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
 
     if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
@@ -858,6 +857,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
     return max_values;
   }
 
+  ::arrow::MemoryPool* pool_;
   ::arrow::BitReader decoder_;
   int32_t values_current_block_;
   int32_t num_mini_blocks_;
@@ -866,7 +866,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
 
   int32_t min_delta_;
   size_t mini_block_idx_;
-  std::unique_ptr<PoolBuffer> delta_bit_widths_;
+  std::shared_ptr<ResizableBuffer> delta_bit_widths_;
   int delta_bit_width_;
 
   int32_t last_value_;
diff --git a/src/parquet/encoding-test.cc b/src/parquet/encoding-test.cc
index 8d97bff..31bb79d 100644
--- a/src/parquet/encoding-test.cc
+++ b/src/parquet/encoding-test.cc
@@ -292,7 +292,7 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
 
  protected:
   USING_BASE_MEMBERS();
-  std::shared_ptr<PoolBuffer> dict_buffer_;
+  std::shared_ptr<ResizableBuffer> dict_buffer_;
 };
 
 TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes);
diff --git a/src/parquet/encoding.h b/src/parquet/encoding.h
index e46ac2f..2742937 100644
--- a/src/parquet/encoding.h
+++ b/src/parquet/encoding.h
@@ -50,18 +50,19 @@ class Encoder {
   virtual void Put(const T* src, int num_values) = 0;
   virtual void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
                          int64_t valid_bits_offset) {
-    PoolBuffer buffer(pool_);
-    ::arrow::Status status = buffer.Resize(num_values * sizeof(T));
+    std::shared_ptr<ResizableBuffer> buffer;
+    auto status = ::arrow::AllocateResizableBuffer(pool_, num_values * sizeof(T),
+                                                   &buffer);
     if (!status.ok()) {
       std::ostringstream ss;
-      ss << "buffer.Resize failed in Encoder.PutSpaced in " << __FILE__ <<
", on line "
-         << __LINE__;
+      ss << "AllocateResizableBuffer failed in Encoder.PutSpaced in "
+         << __FILE__ << ", on line " << __LINE__;
       throw ParquetException(ss.str());
     }
     int32_t num_valid_values = 0;
     ::arrow::internal::BitmapReader valid_bits_reader(valid_bits, valid_bits_offset,
                                                       num_values);
-    T* data = reinterpret_cast<T*>(buffer.mutable_data());
+    T* data = reinterpret_cast<T*>(buffer->mutable_data());
     for (int32_t i = 0; i < num_values; i++) {
       if (valid_bits_reader.IsSet()) {
         data[num_valid_values++] = src[i];
diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc
index ae1c0a7..c5a0f34 100644
--- a/src/parquet/file_reader.cc
+++ b/src/parquet/file_reader.cc
@@ -194,7 +194,7 @@ class SerializedFile : public ParquetFileReader::Contents {
           "file metadata size.");
     }
 
-    std::shared_ptr<PoolBuffer> metadata_buffer =
+    std::shared_ptr<ResizableBuffer> metadata_buffer =
         AllocateBuffer(properties_.memory_pool(), metadata_len);
 
     // Check if the footer_buffer contains the entire metadata
diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h
index 4f9df72..d1c4d16 100644
--- a/src/parquet/statistics.h
+++ b/src/parquet/statistics.h
@@ -181,19 +181,19 @@ class TypedRowGroupStatistics : public RowGroupStatistics {
 
   void PlainEncode(const T& src, std::string* dst);
   void PlainDecode(const std::string& src, T* dst);
-  void Copy(const T& src, T* dst, PoolBuffer* buffer);
+  void Copy(const T& src, T* dst, ResizableBuffer* buffer);
 
-  std::shared_ptr<PoolBuffer> min_buffer_, max_buffer_;
+  std::shared_ptr<ResizableBuffer> min_buffer_, max_buffer_;
 };
 
 template <typename DType>
-inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, PoolBuffer*)
{
+inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, ResizableBuffer*)
{
   *dst = src;
 }
 
 template <>
 inline void TypedRowGroupStatistics<FLBAType>::Copy(const FLBA& src, FLBA* dst,
-                                                    PoolBuffer* buffer) {
+                                                    ResizableBuffer* buffer) {
   if (dst->ptr == src.ptr) return;
   uint32_t len = descr_->type_length();
   PARQUET_THROW_NOT_OK(buffer->Resize(len, false));
@@ -204,7 +204,7 @@ inline void TypedRowGroupStatistics<FLBAType>::Copy(const FLBA&
src, FLBA* dst,
 template <>
 inline void TypedRowGroupStatistics<ByteArrayType>::Copy(const ByteArray& src,
                                                          ByteArray* dst,
-                                                         PoolBuffer* buffer) {
+                                                         ResizableBuffer* buffer) {
   if (dst->ptr == src.ptr) return;
   PARQUET_THROW_NOT_OK(buffer->Resize(src.len, false));
   std::memcpy(buffer->mutable_data(), src.ptr, src.len);
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index a507dfb..3e74398 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -262,7 +262,7 @@ class DictionaryPageBuilder {
   }
 
   shared_ptr<Buffer> WriteDict() {
-    std::shared_ptr<PoolBuffer> dict_buffer =
+    std::shared_ptr<ResizableBuffer> dict_buffer =
         AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size());
     encoder_->WriteDict(dict_buffer->mutable_data());
     return dict_buffer;
diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc
index 4b620ab..cb8c706 100644
--- a/src/parquet/util/memory-test.cc
+++ b/src/parquet/util/memory-test.cc
@@ -255,7 +255,8 @@ TEST(TestBufferedInputStream, Basics) {
   int64_t stream_offset = 10;
   int64_t stream_size = source_size - stream_offset;
   int64_t chunk_size = 50;
-  std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_memory_pool(), source_size);
+  std::shared_ptr<ResizableBuffer> buf = AllocateBuffer(default_memory_pool(),
+                                                        source_size);
   ASSERT_EQ(source_size, buf->size());
   for (int i = 0; i < source_size; i++) {
     buf->mutable_data()[i] = static_cast<uint8_t>(i);
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
index 3aa2570..df7ccc7 100644
--- a/src/parquet/util/memory.cc
+++ b/src/parquet/util/memory.cc
@@ -36,7 +36,7 @@ namespace parquet {
 
 template <class T>
 Vector<T>::Vector(int64_t size, MemoryPool* pool)
-    : buffer_(AllocateUniqueBuffer(pool, size * sizeof(T))),
+    : buffer_(AllocateBuffer(pool, size * sizeof(T))),
       size_(size),
       capacity_(size) {
   if (size > 0) {
@@ -495,19 +495,10 @@ void BufferedInputStream::Advance(int64_t num_bytes) {
   buffer_offset_ += num_bytes;
 }
 
-std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryPool* pool, int64_t size) {
-  auto result = std::make_shared<PoolBuffer>(pool);
-  if (size > 0) {
-    PARQUET_THROW_NOT_OK(result->Resize(size));
-  }
-  return result;
-}
-
-std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(MemoryPool* pool, int64_t size) {
-  std::unique_ptr<PoolBuffer> result(new PoolBuffer(pool));
-  if (size > 0) {
-    PARQUET_THROW_NOT_OK(result->Resize(size));
-  }
+std::shared_ptr<ResizableBuffer> AllocateBuffer(MemoryPool* pool, int64_t size) {
+  std::shared_ptr<ResizableBuffer> result;
+  PARQUET_THROW_NOT_OK(arrow::AllocateResizableBuffer(pool, size,
+                                                      &result));
   return result;
 }
 
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 5408d1c..69dcebf 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -74,7 +74,7 @@ static constexpr int64_t kInMemoryDefaultCapacity = 1024;
 using Buffer = ::arrow::Buffer;
 using MutableBuffer = ::arrow::MutableBuffer;
 using ResizableBuffer = ::arrow::ResizableBuffer;
-using PoolBuffer = ::arrow::PoolBuffer;
+using ResizableBuffer = ::arrow::ResizableBuffer;
 
 template <class T>
 class PARQUET_EXPORT Vector {
@@ -89,7 +89,7 @@ class PARQUET_EXPORT Vector {
   const T* data() const { return data_; }
 
  private:
-  std::unique_ptr<PoolBuffer> buffer_;
+  std::shared_ptr<ResizableBuffer> buffer_;
   int64_t size_;
   int64_t capacity_;
   T* data_;
@@ -429,7 +429,7 @@ class PARQUET_EXPORT BufferedInputStream : public InputStream {
   virtual void Advance(int64_t num_bytes);
 
  private:
-  std::shared_ptr<PoolBuffer> buffer_;
+  std::shared_ptr<ResizableBuffer> buffer_;
   RandomAccessSource* source_;
   int64_t stream_offset_;
   int64_t stream_end_;
@@ -437,11 +437,8 @@ class PARQUET_EXPORT BufferedInputStream : public InputStream {
   int64_t buffer_size_;
 };
 
-std::shared_ptr<PoolBuffer> PARQUET_EXPORT AllocateBuffer(::arrow::MemoryPool* pool,
-                                                          int64_t size = 0);
-
-std::unique_ptr<PoolBuffer> PARQUET_EXPORT AllocateUniqueBuffer(::arrow::MemoryPool*
pool,
-                                                                int64_t size = 0);
+std::shared_ptr<ResizableBuffer> PARQUET_EXPORT AllocateBuffer(
+  ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(), int64_t size = 0);
 
 }  // namespace parquet
 


Mime
View raw message