parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-874: Use default memory allocator from Arrow
Date Sat, 11 Feb 2017 21:48:31 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master c14706f6e -> 5e59bc5c6


PARQUET-874: Use default memory allocator from Arrow

Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>

Closes #241 from xhochy/PARQUET-874 and squashes the following commits:

7e7fbfb [Korn, Uwe] Fix benchmark compilation
c57b2fa [Korn, Uwe] PARQUET-874: Use default memory allocator from Arrow


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/5e59bc5c
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/5e59bc5c
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/5e59bc5c

Branch: refs/heads/master
Commit: 5e59bc5c6491a7505585c08fd62aa52f9a6c9afc
Parents: c14706f
Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>
Authored: Sat Feb 11 16:48:24 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sat Feb 11 16:48:24 2017 -0500

----------------------------------------------------------------------
 src/parquet/column/properties.h         | 28 ++++++------
 src/parquet/column/reader.cc            | 31 +++++++------
 src/parquet/column/reader.h             | 16 +++----
 src/parquet/column/scanner.cc           | 23 +++++-----
 src/parquet/column/scanner.h            | 10 ++--
 src/parquet/column/statistics-test.cc   | 19 ++++----
 src/parquet/column/statistics.cc        | 28 ++++++------
 src/parquet/column/statistics.h         |  8 ++--
 src/parquet/column/test-util.h          |  2 +-
 src/parquet/column/writer.cc            | 19 ++++----
 src/parquet/column/writer.h             |  2 +-
 src/parquet/encoding-benchmark.cc       |  5 +-
 src/parquet/encoding-internal.h         | 54 +++++++++++-----------
 src/parquet/encoding-test.cc            | 12 +++--
 src/parquet/encoding.h                  |  8 ++--
 src/parquet/file/file-serialize-test.cc |  3 +-
 src/parquet/file/reader-internal.cc     | 10 ++--
 src/parquet/file/reader-internal.h      |  3 +-
 src/parquet/file/reader.cc              |  4 +-
 src/parquet/file/writer-internal.cc     | 10 ++--
 src/parquet/file/writer-internal.h      |  4 +-
 src/parquet/thrift.h                    |  6 ++-
 src/parquet/util/memory-test.cc         | 50 ++------------------
 src/parquet/util/memory.cc              | 68 +++++++---------------------
 src/parquet/util/memory.h               | 35 ++++----------
 src/parquet/util/test-common.h          |  3 +-
 26 files changed, 194 insertions(+), 267 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index b9b0bf3..6165603 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -39,20 +39,20 @@ static bool DEFAULT_USE_BUFFERED_STREAM = false;
 
 class PARQUET_EXPORT ReaderProperties {
  public:
-  explicit ReaderProperties(MemoryAllocator* allocator = default_allocator())
-      : allocator_(allocator) {
+  explicit ReaderProperties(::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : pool_(pool) {
     buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM;
     buffer_size_ = DEFAULT_BUFFER_SIZE;
   }
 
-  MemoryAllocator* allocator() const { return allocator_; }
+  ::arrow::MemoryPool* memory_pool() const { return pool_; }
 
   std::unique_ptr<InputStream> GetStream(
       RandomAccessSource* source, int64_t start, int64_t num_bytes) {
     std::unique_ptr<InputStream> stream;
     if (buffered_stream_enabled_) {
       stream.reset(
-          new BufferedInputStream(allocator_, buffer_size_, source, start, num_bytes));
+          new BufferedInputStream(pool_, buffer_size_, source, start, num_bytes));
     } else {
       stream.reset(new InMemoryInputStream(source, start, num_bytes));
     }
@@ -70,7 +70,7 @@ class PARQUET_EXPORT ReaderProperties {
   int64_t buffer_size() const { return buffer_size_; }
 
  private:
-  MemoryAllocator* allocator_;
+  ::arrow::MemoryPool* pool_;
   int64_t buffer_size_;
   bool buffered_stream_enabled_;
 };
@@ -110,7 +110,7 @@ class PARQUET_EXPORT WriterProperties {
   class Builder {
    public:
     Builder()
-        : allocator_(default_allocator()),
+        : pool_(::arrow::default_memory_pool()),
           dictionary_pagesize_limit_(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT),
           write_batch_size_(DEFAULT_WRITE_BATCH_SIZE),
           pagesize_(DEFAULT_PAGE_SIZE),
@@ -118,8 +118,8 @@ class PARQUET_EXPORT WriterProperties {
           created_by_(DEFAULT_CREATED_BY) {}
     virtual ~Builder() {}
 
-    Builder* allocator(MemoryAllocator* allocator) {
-      allocator_ = allocator;
+    Builder* memory_pool(::arrow::MemoryPool* pool) {
+      pool_ = pool;
       return this;
     }
 
@@ -281,13 +281,13 @@ class PARQUET_EXPORT WriterProperties {
       for (const auto& item : statistics_enabled_)
         get(item.first).statistics_enabled = item.second;
 
-      return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_,
+      return std::shared_ptr<WriterProperties>(new WriterProperties(pool_,
           dictionary_pagesize_limit_, write_batch_size_, pagesize_, version_, created_by_,
           default_column_properties_, column_properties));
     }
 
    private:
-    MemoryAllocator* allocator_;
+    ::arrow::MemoryPool* pool_;
     int64_t dictionary_pagesize_limit_;
     int64_t write_batch_size_;
     int64_t pagesize_;
@@ -302,7 +302,7 @@ class PARQUET_EXPORT WriterProperties {
     std::unordered_map<std::string, bool> statistics_enabled_;
   };
 
-  inline MemoryAllocator* allocator() const { return allocator_; }
+  inline ::arrow::MemoryPool* memory_pool() const { return pool_; }
 
   inline int64_t dictionary_pagesize_limit() const { return dictionary_pagesize_limit_; }
 
@@ -354,11 +354,11 @@ class PARQUET_EXPORT WriterProperties {
   }
 
  private:
-  explicit WriterProperties(MemoryAllocator* allocator, int64_t dictionary_pagesize_limit,
+  explicit WriterProperties(::arrow::MemoryPool* pool, int64_t dictionary_pagesize_limit,
       int64_t write_batch_size, int64_t pagesize, ParquetVersion::type version,
       const std::string& created_by, const ColumnProperties& default_column_properties,
       const std::unordered_map<std::string, ColumnProperties>& column_properties)
-      : allocator_(allocator),
+      : pool_(pool),
         dictionary_pagesize_limit_(dictionary_pagesize_limit),
         write_batch_size_(write_batch_size),
         pagesize_(pagesize),
@@ -367,7 +367,7 @@ class PARQUET_EXPORT WriterProperties {
         default_column_properties_(default_column_properties),
         column_properties_(column_properties) {}
 
-  MemoryAllocator* allocator_;
+  ::arrow::MemoryPool* pool_;
   int64_t dictionary_pagesize_limit_;
   int64_t write_batch_size_;
   int64_t pagesize_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index a547fdc..71bb689 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -25,6 +25,8 @@
 #include "parquet/column/properties.h"
 #include "parquet/encoding-internal.h"
 
+using arrow::MemoryPool;
+
 namespace parquet {
 
 ReaderProperties default_reader_properties() {
@@ -32,13 +34,13 @@ ReaderProperties default_reader_properties() {
   return default_reader_properties;
 }
 
-ColumnReader::ColumnReader(const ColumnDescriptor* descr,
-    std::unique_ptr<PageReader> pager, MemoryAllocator* allocator)
+ColumnReader::ColumnReader(
+    const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool)
     : descr_(descr),
       pager_(std::move(pager)),
       num_buffered_values_(0),
       num_decoded_values_(0),
-      allocator_(allocator) {}
+      pool_(pool) {}
 
 ColumnReader::~ColumnReader() {}
 
@@ -66,7 +68,7 @@ void TypedColumnReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
     // TODO(wesm): investigate whether this all-or-nothing decoding of the
     // dictionary makes sense and whether performance can be improved
 
-    auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, allocator_);
+    auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
     decoder->SetDict(&dictionary);
     decoders_[encoding] = decoder;
   } else {
@@ -194,26 +196,25 @@ int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels)
 // ----------------------------------------------------------------------
 // Dynamic column reader constructor
 
-std::shared_ptr<ColumnReader> ColumnReader::Make(const ColumnDescriptor* descr,
-    std::unique_ptr<PageReader> pager, MemoryAllocator* allocator) {
+std::shared_ptr<ColumnReader> ColumnReader::Make(
+    const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool) {
   switch (descr->physical_type()) {
     case Type::BOOLEAN:
-      return std::make_shared<BoolReader>(descr, std::move(pager), allocator);
+      return std::make_shared<BoolReader>(descr, std::move(pager), pool);
     case Type::INT32:
-      return std::make_shared<Int32Reader>(descr, std::move(pager), allocator);
+      return std::make_shared<Int32Reader>(descr, std::move(pager), pool);
     case Type::INT64:
-      return std::make_shared<Int64Reader>(descr, std::move(pager), allocator);
+      return std::make_shared<Int64Reader>(descr, std::move(pager), pool);
     case Type::INT96:
-      return std::make_shared<Int96Reader>(descr, std::move(pager), allocator);
+      return std::make_shared<Int96Reader>(descr, std::move(pager), pool);
     case Type::FLOAT:
-      return std::make_shared<FloatReader>(descr, std::move(pager), allocator);
+      return std::make_shared<FloatReader>(descr, std::move(pager), pool);
     case Type::DOUBLE:
-      return std::make_shared<DoubleReader>(descr, std::move(pager), allocator);
+      return std::make_shared<DoubleReader>(descr, std::move(pager), pool);
     case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayReader>(descr, std::move(pager), allocator);
+      return std::make_shared<ByteArrayReader>(descr, std::move(pager), pool);
     case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayReader>(
-          descr, std::move(pager), allocator);
+      return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager), pool);
     default:
       ParquetException::NYI("type reader not implemented");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 0c1e25c..e0c6585 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -41,12 +41,12 @@ namespace parquet {
 class PARQUET_EXPORT ColumnReader {
  public:
   ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
-      MemoryAllocator* allocator = default_allocator());
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
   virtual ~ColumnReader();
 
   static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor* descr,
       std::unique_ptr<PageReader> pager,
-      MemoryAllocator* allocator = default_allocator());
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
   // Returns true if there are still values in this column.
   bool HasNext() {
@@ -97,7 +97,7 @@ class PARQUET_EXPORT ColumnReader {
   // into memory
   int num_decoded_values_;
 
-  MemoryAllocator* allocator_;
+  ::arrow::MemoryPool* pool_;
 };
 
 // API to read values from a single column. This is the main client facing API.
@@ -107,8 +107,8 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
   typedef typename DType::c_type T;
 
   TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager,
-      MemoryAllocator* allocator = default_allocator())
-      : ColumnReader(schema, std::move(pager), allocator), current_decoder_(NULL) {}
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULL) {}
   virtual ~TypedColumnReader() {}
 
   // Read a batch of repetition levels, definition levels, and values from the
@@ -374,12 +374,12 @@ inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
       int64_t values_read = 0;
 
       std::shared_ptr<PoolBuffer> vals = AllocateBuffer(
-          this->allocator_, batch_size * type_traits<DType::type_num>::value_byte_size);
+          this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
       std::shared_ptr<PoolBuffer> def_levels =
-          AllocateBuffer(this->allocator_, batch_size * sizeof(int16_t));
+          AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
 
       std::shared_ptr<PoolBuffer> rep_levels =
-          AllocateBuffer(this->allocator_, batch_size * sizeof(int16_t));
+          AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
 
       do {
         batch_size = std::min(batch_size, rows_to_skip);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/scanner.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.cc b/src/parquet/column/scanner.cc
index faf99a0..0295315 100644
--- a/src/parquet/column/scanner.cc
+++ b/src/parquet/column/scanner.cc
@@ -23,28 +23,29 @@
 #include "parquet/column/reader.h"
 #include "parquet/util/memory.h"
 
+using arrow::MemoryPool;
+
 namespace parquet {
 
-std::shared_ptr<Scanner> Scanner::Make(std::shared_ptr<ColumnReader> col_reader,
-    int64_t batch_size, MemoryAllocator* allocator) {
+std::shared_ptr<Scanner> Scanner::Make(
+    std::shared_ptr<ColumnReader> col_reader, int64_t batch_size, MemoryPool* pool) {
   switch (col_reader->type()) {
     case Type::BOOLEAN:
-      return std::make_shared<BoolScanner>(col_reader, batch_size, allocator);
+      return std::make_shared<BoolScanner>(col_reader, batch_size, pool);
     case Type::INT32:
-      return std::make_shared<Int32Scanner>(col_reader, batch_size, allocator);
+      return std::make_shared<Int32Scanner>(col_reader, batch_size, pool);
     case Type::INT64:
-      return std::make_shared<Int64Scanner>(col_reader, batch_size, allocator);
+      return std::make_shared<Int64Scanner>(col_reader, batch_size, pool);
     case Type::INT96:
-      return std::make_shared<Int96Scanner>(col_reader, batch_size, allocator);
+      return std::make_shared<Int96Scanner>(col_reader, batch_size, pool);
     case Type::FLOAT:
-      return std::make_shared<FloatScanner>(col_reader, batch_size, allocator);
+      return std::make_shared<FloatScanner>(col_reader, batch_size, pool);
     case Type::DOUBLE:
-      return std::make_shared<DoubleScanner>(col_reader, batch_size, allocator);
+      return std::make_shared<DoubleScanner>(col_reader, batch_size, pool);
     case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayScanner>(col_reader, batch_size, allocator);
+      return std::make_shared<ByteArrayScanner>(col_reader, batch_size, pool);
     case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayScanner>(
-          col_reader, batch_size, allocator);
+      return std::make_shared<FixedLenByteArrayScanner>(col_reader, batch_size, pool);
     default:
       ParquetException::NYI("type reader not implemented");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
index 17340b7..47daaed 100644
--- a/src/parquet/column/scanner.h
+++ b/src/parquet/column/scanner.h
@@ -40,11 +40,11 @@ class PARQUET_EXPORT Scanner {
  public:
   explicit Scanner(std::shared_ptr<ColumnReader> reader,
       int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
-      MemoryAllocator* allocator = default_allocator())
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : batch_size_(batch_size),
         level_offset_(0),
         levels_buffered_(0),
-        value_buffer_(std::make_shared<PoolBuffer>(allocator)),
+        value_buffer_(std::make_shared<PoolBuffer>(pool)),
         value_offset_(0),
         values_buffered_(0),
         reader_(reader) {
@@ -56,7 +56,7 @@ class PARQUET_EXPORT Scanner {
 
   static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
       int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
-      MemoryAllocator* allocator = default_allocator());
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
   virtual void PrintNext(std::ostream& out, int width) = 0;
 
@@ -91,8 +91,8 @@ class PARQUET_EXPORT TypedScanner : public Scanner {
 
   explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
       int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
-      MemoryAllocator* allocator = default_allocator())
-      : Scanner(reader, batch_size, allocator) {
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : Scanner(reader, batch_size, pool) {
     typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
     int value_byte_size = type_traits<DType::type_num>::value_byte_size;
     PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc
index 5eb7625..dcaad45 100644
--- a/src/parquet/column/statistics-test.cc
+++ b/src/parquet/column/statistics-test.cc
@@ -35,6 +35,9 @@
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+
 namespace parquet {
 
 using schema::NodePtr;
@@ -199,10 +202,10 @@ template <>
 std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy(
     const std::vector<FLBA>& values) {
   std::vector<FLBA> copy;
-  MemoryAllocator* allocator = default_allocator();
+  MemoryPool* pool = ::arrow::default_memory_pool();
   for (const FLBA& flba : values) {
     uint8_t* ptr;
-    PARQUET_THROW_NOT_OK(allocator->Allocate(FLBA_LENGTH, &ptr));
+    PARQUET_THROW_NOT_OK(pool->Allocate(FLBA_LENGTH, &ptr));
     memcpy(ptr, flba.ptr, FLBA_LENGTH);
     copy.emplace_back(ptr);
   }
@@ -213,10 +216,10 @@ template <>
 std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy(
     const std::vector<ByteArray>& values) {
   std::vector<ByteArray> copy;
-  MemoryAllocator* allocator = default_allocator();
+  MemoryPool* pool = default_memory_pool();
   for (const ByteArray& ba : values) {
     uint8_t* ptr;
-    PARQUET_THROW_NOT_OK(allocator->Allocate(ba.len, &ptr));
+    PARQUET_THROW_NOT_OK(pool->Allocate(ba.len, &ptr));
     memcpy(ptr, ba.ptr, ba.len);
     copy.emplace_back(ba.len, ptr);
   }
@@ -229,21 +232,21 @@ void TestRowGroupStatistics<TestType>::DeepFree(
 
 template <>
 void TestRowGroupStatistics<FLBAType>::DeepFree(std::vector<FLBA>& values) {
-  MemoryAllocator* allocator = default_allocator();
+  MemoryPool* pool = default_memory_pool();
   for (FLBA& flba : values) {
     auto ptr = const_cast<uint8_t*>(flba.ptr);
     memset(ptr, 0, FLBA_LENGTH);
-    allocator->Free(ptr, FLBA_LENGTH);
+    pool->Free(ptr, FLBA_LENGTH);
   }
 }
 
 template <>
 void TestRowGroupStatistics<ByteArrayType>::DeepFree(std::vector<ByteArray>& values) {
-  MemoryAllocator* allocator = default_allocator();
+  MemoryPool* pool = default_memory_pool();
   for (ByteArray& ba : values) {
     auto ptr = const_cast<uint8_t*>(ba.ptr);
     memset(ptr, 0, ba.len);
-    allocator->Free(ptr, ba.len);
+    pool->Free(ptr, ba.len);
   }
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/statistics.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc
index 897287e..e67a3d3 100644
--- a/src/parquet/column/statistics.cc
+++ b/src/parquet/column/statistics.cc
@@ -24,14 +24,17 @@
 #include "parquet/util/comparison.h"
 #include "parquet/util/memory.h"
 
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+
 namespace parquet {
 
 template <typename DType>
 TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(
-    const ColumnDescriptor* schema, MemoryAllocator* allocator)
-    : allocator_(allocator),
-      min_buffer_(AllocateBuffer(allocator_, 0)),
-      max_buffer_(AllocateBuffer(allocator_, 0)) {
+    const ColumnDescriptor* schema, MemoryPool* pool)
+    : pool_(pool),
+      min_buffer_(AllocateBuffer(pool_, 0)),
+      max_buffer_(AllocateBuffer(pool_, 0)) {
   SetDescr(schema);
   Reset();
 }
@@ -40,9 +43,9 @@ template <typename DType>
 TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_type& min,
     const typename DType::c_type& max, int64_t num_values, int64_t null_count,
     int64_t distinct_count)
-    : allocator_(default_allocator()),
-      min_buffer_(AllocateBuffer(allocator_, 0)),
-      max_buffer_(AllocateBuffer(allocator_, 0)) {
+    : pool_(default_memory_pool()),
+      min_buffer_(AllocateBuffer(pool_, 0)),
+      max_buffer_(AllocateBuffer(pool_, 0)) {
   IncrementNumValues(num_values);
   IncrementNullCount(null_count);
   IncrementDistinctCount(distinct_count);
@@ -55,11 +58,10 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_
 template <typename DType>
 TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor* schema,
     const std::string& encoded_min, const std::string& encoded_max, int64_t num_values,
-    int64_t null_count, int64_t distinct_count, bool has_min_max,
-    MemoryAllocator* allocator)
-    : allocator_(allocator),
-      min_buffer_(AllocateBuffer(allocator_, 0)),
-      max_buffer_(AllocateBuffer(allocator_, 0)) {
+    int64_t null_count, int64_t distinct_count, bool has_min_max, MemoryPool* pool)
+    : pool_(pool),
+      min_buffer_(AllocateBuffer(pool_, 0)),
+      max_buffer_(AllocateBuffer(pool_, 0)) {
   IncrementNumValues(num_values);
   IncrementNullCount(null_count);
   IncrementDistinctCount(distinct_count);
@@ -204,7 +206,7 @@ EncodedStatistics TypedRowGroupStatistics<DType>::Encode() {
 
 template <typename DType>
 void TypedRowGroupStatistics<DType>::PlainEncode(const T& src, std::string* dst) {
-  PlainEncoder<DType> encoder(descr(), allocator_);
+  PlainEncoder<DType> encoder(descr(), pool_);
   encoder.Put(&src, 1);
   auto buffer = encoder.FlushValues();
   auto ptr = reinterpret_cast<const char*>(buffer->data());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h
index af5ada6..509e587 100644
--- a/src/parquet/column/statistics.h
+++ b/src/parquet/column/statistics.h
@@ -133,8 +133,8 @@ class TypedRowGroupStatistics : public RowGroupStatistics {
  public:
   using T = typename DType::c_type;
 
-  TypedRowGroupStatistics(
-      const ColumnDescriptor* schema, MemoryAllocator* allocator = default_allocator());
+  TypedRowGroupStatistics(const ColumnDescriptor* schema,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
   TypedRowGroupStatistics(const T& min, const T& max, int64_t num_values,
       int64_t null_count, int64_t distinct_count);
@@ -142,7 +142,7 @@ class TypedRowGroupStatistics : public RowGroupStatistics {
   TypedRowGroupStatistics(const ColumnDescriptor* schema, const std::string& encoded_min,
       const std::string& encoded_max, int64_t num_values, int64_t null_count,
       int64_t distinct_count, bool has_min_max,
-      MemoryAllocator* allocator = default_allocator());
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
   bool HasMinMax() const override;
   void Reset() override;
@@ -163,7 +163,7 @@ class TypedRowGroupStatistics : public RowGroupStatistics {
   bool has_min_max_ = false;
   T min_;
   T max_;
-  MemoryAllocator* allocator_;
+  ::arrow::MemoryPool* pool_;
 
   void PlainEncode(const T& src, std::string* dst);
   void PlainDecode(const std::string& src, T* dst);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 12b818d..97e936a 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -253,7 +253,7 @@ class DictionaryPageBuilder {
 
   shared_ptr<Buffer> WriteDict() {
     std::shared_ptr<PoolBuffer> dict_buffer =
-        AllocateBuffer(default_allocator(), encoder_->dict_encoded_size());
+        AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size());
     encoder_->WriteDict(dict_buffer->mutable_data());
     return dict_buffer;
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 7c85905..fc0372f 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -44,8 +44,8 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
       has_dictionary_(has_dictionary),
       encoding_(encoding),
       properties_(properties),
-      allocator_(properties->allocator()),
-      pool_(properties->allocator()),
+      allocator_(properties->memory_pool()),
+      pool_(properties->memory_pool()),
       num_buffered_values_(0),
       num_buffered_encoded_values_(0),
       num_rows_(0),
@@ -56,8 +56,8 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
 }
 
 void ColumnWriter::InitSinks() {
-  definition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator()));
-  repetition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator()));
+  definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+  repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
 }
 
 void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
@@ -78,8 +78,7 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels(
   int64_t rle_size =
       LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, num_buffered_values_) +
       sizeof(int32_t);
-  std::shared_ptr<PoolBuffer> buffer_rle =
-      AllocateBuffer(properties_->allocator(), rle_size);
+  std::shared_ptr<PoolBuffer> buffer_rle = AllocateBuffer(allocator_, rle_size);
   level_encoder_.Init(Encoding::RLE, max_level, num_buffered_values_,
       buffer_rle->mutable_data() + sizeof(int32_t), buffer_rle->size() - sizeof(int32_t));
   int encoded = level_encoder_.Encode(
@@ -188,12 +187,12 @@ TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
           encoding, properties) {
   switch (encoding) {
     case Encoding::PLAIN:
-      current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->allocator()));
+      current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->memory_pool()));
       break;
     case Encoding::PLAIN_DICTIONARY:
     case Encoding::RLE_DICTIONARY:
       current_encoder_.reset(
-          new DictEncoder<Type>(descr_, &pool_, properties->allocator()));
+          new DictEncoder<Type>(descr_, &pool_, properties->memory_pool()));
       break;
     default:
       ParquetException::NYI("Selected encoding is not supported");
@@ -216,7 +215,7 @@ void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
     FlushBufferedDataPages();
     fallback_ = true;
     // Only PLAIN encoding is supported for fallback in V1
-    current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->allocator()));
+    current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->memory_pool()));
     encoding_ = Encoding::PLAIN;
   }
 }
@@ -225,7 +224,7 @@ template <typename Type>
 void TypedColumnWriter<Type>::WriteDictionaryPage() {
   auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
   std::shared_ptr<PoolBuffer> buffer =
-      AllocateBuffer(properties_->allocator(), dict_encoder->dict_encoded_size());
+      AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
   dict_encoder->WriteDict(buffer->mutable_data());
   // TODO Get rid of this deep call
   dict_encoder->mem_pool()->FreeAll();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index 3ee8fd6..c91f261 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -108,7 +108,7 @@ class PARQUET_EXPORT ColumnWriter {
 
   LevelEncoder level_encoder_;
 
-  MemoryAllocator* allocator_;
+  ::arrow::MemoryPool* allocator_;
   ChunkedAllocator pool_;
 
   // The total number of values stored in the data page. This is the maximum of

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/encoding-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encoding-benchmark.cc b/src/parquet/encoding-benchmark.cc
index cf65dd9..8ea684a 100644
--- a/src/parquet/encoding-benchmark.cc
+++ b/src/parquet/encoding-benchmark.cc
@@ -21,6 +21,9 @@
 #include "parquet/file/reader-internal.h"
 #include "parquet/util/memory.h"
 
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+
 namespace parquet {
 
 using format::ColumnChunk;
@@ -102,7 +105,7 @@ static void DecodeDict(
   int num_values = values.size();
 
   ChunkedAllocator pool;
-  MemoryAllocator* allocator = default_allocator();
+  MemoryPool* allocator = default_memory_pool();
   std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED);
 
   DictEncoder<Type> encoder(descr.get(), &pool, allocator);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/encoding-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index 67cd7ba..ffcb531 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -172,10 +172,10 @@ class PlainEncoder : public Encoder<DType> {
  public:
   typedef typename DType::c_type T;
 
-  explicit PlainEncoder(
-      const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
-      : Encoder<DType>(descr, Encoding::PLAIN, allocator) {
-    values_sink_.reset(new InMemoryOutputStream(allocator));
+  explicit PlainEncoder(const ColumnDescriptor* descr,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : Encoder<DType>(descr, Encoding::PLAIN, pool) {
+    values_sink_.reset(new InMemoryOutputStream(pool));
   }
 
   int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); }
@@ -190,12 +190,12 @@ class PlainEncoder : public Encoder<DType> {
 template <>
 class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
  public:
-  explicit PlainEncoder(
-      const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
-      : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator),
+  explicit PlainEncoder(const ColumnDescriptor* descr,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : Encoder<BooleanType>(descr, Encoding::PLAIN, pool),
         bits_available_(kInMemoryDefaultCapacity * 8),
-        bits_buffer_(AllocateBuffer(allocator, kInMemoryDefaultCapacity)),
-        values_sink_(new InMemoryOutputStream(allocator)) {
+        bits_buffer_(AllocateBuffer(pool, kInMemoryDefaultCapacity)),
+        values_sink_(new InMemoryOutputStream(pool)) {
     bit_writer_.reset(new BitWriter(bits_buffer_->mutable_data(), bits_buffer_->size()));
   }
 
@@ -212,7 +212,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
     }
 
     std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
-    values_sink_.reset(new InMemoryOutputStream(this->allocator_));
+    values_sink_.reset(new InMemoryOutputStream(this->pool_));
     return buffer;
   }
 
@@ -267,7 +267,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
 template <typename DType>
 inline std::shared_ptr<Buffer> PlainEncoder<DType>::FlushValues() {
   std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
-  values_sink_.reset(new InMemoryOutputStream(this->allocator_));
+  values_sink_.reset(new InMemoryOutputStream(this->pool_));
   return buffer;
 }
 
@@ -309,11 +309,11 @@ class DictionaryDecoder : public Decoder<Type> {
   // Initializes the dictionary with values from 'dictionary'. The data in
   // dictionary is not guaranteed to persist in memory after this call so the
   // dictionary decoder needs to copy the data out if necessary.
-  explicit DictionaryDecoder(
-      const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
+  explicit DictionaryDecoder(const ColumnDescriptor* descr,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : Decoder<Type>(descr, Encoding::RLE_DICTIONARY),
-        dictionary_(0, allocator),
-        byte_array_data_(AllocateBuffer(allocator, 0)) {}
+        dictionary_(0, pool),
+        byte_array_data_(AllocateBuffer(pool, 0)) {}
 
   // Perform type-specific initiatialization
   void SetDict(Decoder<Type>* dictionary);
@@ -435,7 +435,7 @@ class DictEncoder : public Encoder<DType> {
   typedef typename DType::c_type T;
 
   explicit DictEncoder(const ColumnDescriptor* desc, ChunkedAllocator* pool = nullptr,
-      MemoryAllocator* allocator = default_allocator())
+      ::arrow::MemoryPool* allocator = ::arrow::default_memory_pool())
       : Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator),
         allocator_(allocator),
         pool_(pool),
@@ -524,7 +524,7 @@ class DictEncoder : public Encoder<DType> {
   int num_entries() const { return uniques_.size(); }
 
  private:
-  MemoryAllocator* allocator_;
+  ::arrow::MemoryPool* allocator_;
 
   // For ByteArray / FixedLenByteArray data. Not owned
   ChunkedAllocator* pool_;
@@ -742,10 +742,10 @@ class DeltaBitPackDecoder : public Decoder<DType> {
  public:
   typedef typename DType::c_type T;
 
-  explicit DeltaBitPackDecoder(
-      const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
+  explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED),
-        delta_bit_widths_(new PoolBuffer(allocator)) {
+        delta_bit_widths_(new PoolBuffer(pool)) {
     if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
       throw ParquetException("Delta bit pack encoding should only be for integer data.");
     }
@@ -835,10 +835,10 @@ class DeltaBitPackDecoder : public Decoder<DType> {
 
 class DeltaLengthByteArrayDecoder : public Decoder<ByteArrayType> {
  public:
-  explicit DeltaLengthByteArrayDecoder(
-      const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
+  explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : Decoder<ByteArrayType>(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
-        len_decoder_(nullptr, allocator) {}
+        len_decoder_(nullptr, pool) {}
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
     num_values_ = num_values;
@@ -876,11 +876,11 @@ class DeltaLengthByteArrayDecoder : public Decoder<ByteArrayType> {
 
 class DeltaByteArrayDecoder : public Decoder<ByteArrayType> {
  public:
-  explicit DeltaByteArrayDecoder(
-      const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
+  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
       : Decoder<ByteArrayType>(descr, Encoding::DELTA_BYTE_ARRAY),
-        prefix_len_decoder_(nullptr, allocator),
-        suffix_decoder_(nullptr, allocator) {}
+        prefix_len_decoder_(nullptr, pool),
+        suffix_decoder_(nullptr, pool) {}
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
     num_values_ = num_values;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encoding-test.cc b/src/parquet/encoding-test.cc
index 8f266b4..fbf6812 100644
--- a/src/parquet/encoding-test.cc
+++ b/src/parquet/encoding-test.cc
@@ -29,6 +29,9 @@
 #include "parquet/util/memory.h"
 #include "parquet/util/test-common.h"
 
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+
 using std::string;
 using std::vector;
 
@@ -146,7 +149,7 @@ class TestEncodingBase : public ::testing::Test {
   void SetUp() {
     descr_ = ExampleDescr<Type>();
     type_length_ = descr_->type_length();
-    allocator_ = default_allocator();
+    allocator_ = default_memory_pool();
   }
 
   void TearDown() { pool_.FreeAll(); }
@@ -176,7 +179,7 @@ class TestEncodingBase : public ::testing::Test {
 
  protected:
   ChunkedAllocator pool_;
-  MemoryAllocator* allocator_;
+  MemoryPool* allocator_;
 
   int num_values_;
   int type_length_;
@@ -235,7 +238,8 @@ TYPED_TEST(TestPlainEncoding, BasicRoundTrip) {
 // Dictionary encoding tests
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    ByteArrayType, FLBAType> DictEncodedTypes;
+    ByteArrayType, FLBAType>
+    DictEncodedTypes;
 
 template <typename Type>
 class TestDictionaryEncoding : public TestEncodingBase<Type> {
@@ -248,7 +252,7 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
     DictEncoder<Type> encoder(descr_.get(), &pool_);
 
     ASSERT_NO_THROW(encoder.Put(draws_, num_values_));
-    dict_buffer_ = AllocateBuffer(default_allocator(), encoder.dict_encoded_size());
+    dict_buffer_ = AllocateBuffer(default_memory_pool(), encoder.dict_encoded_size());
     encoder.WriteDict(dict_buffer_->mutable_data());
     std::shared_ptr<Buffer> indices = encoder.FlushValues();
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encoding.h b/src/parquet/encoding.h
index 2e2bf68..384971a 100644
--- a/src/parquet/encoding.h
+++ b/src/parquet/encoding.h
@@ -49,7 +49,7 @@ class Encoder {
   virtual void Put(const T* src, int num_values) = 0;
   virtual void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
       int64_t valid_bits_offset) {
-    PoolBuffer buffer(allocator_);
+    PoolBuffer buffer(pool_);
     buffer.Resize(num_values * sizeof(T));
     int32_t num_valid_values = 0;
     INIT_BITSET(valid_bits, valid_bits_offset);
@@ -67,13 +67,13 @@ class Encoder {
 
  protected:
   explicit Encoder(const ColumnDescriptor* descr, const Encoding::type& encoding,
-      MemoryAllocator* allocator)
-      : descr_(descr), encoding_(encoding), allocator_(allocator) {}
+      ::arrow::MemoryPool* pool)
+      : descr_(descr), encoding_(encoding), pool_(pool) {}
 
   // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
   const ColumnDescriptor* descr_;
   const Encoding::type encoding_;
-  MemoryAllocator* allocator_;
+  ::arrow::MemoryPool* pool_;
 };
 
 // The Decoder template is parameterized on parquet::DataType subclasses

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 71dd5c4..7a90eeb 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -106,7 +106,8 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
 };
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    BooleanType, ByteArrayType, FLBAType> TestTypes;
+    BooleanType, ByteArrayType, FLBAType>
+    TestTypes;
 
 TYPED_TEST_CASE(TestSerialize, TestTypes);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 328197a..4eb40b4 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -32,6 +32,8 @@
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
 
+using arrow::MemoryPool;
+
 namespace parquet {
 
 // ----------------------------------------------------------------------
@@ -39,9 +41,9 @@ namespace parquet {
 // assembled in a serialized stream for storing in a Parquet files
 
 SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
-    int64_t total_num_rows, Compression::type codec_type, MemoryAllocator* allocator)
+    int64_t total_num_rows, Compression::type codec_type, MemoryPool* pool)
     : stream_(std::move(stream)),
-      decompression_buffer_(AllocateBuffer(allocator, 0)),
+      decompression_buffer_(AllocateBuffer(pool, 0)),
       seen_num_rows_(0),
       total_num_rows_(total_num_rows) {
   max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
@@ -194,7 +196,7 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
   stream = properties_.GetStream(source_, col_start, col_length);
 
   return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(stream),
-      row_group_metadata_->num_rows(), col->compression(), properties_.allocator()));
+      row_group_metadata_->num_rows(), col->compression(), properties_.memory_pool()));
 }
 
 // ----------------------------------------------------------------------
@@ -268,7 +270,7 @@ void SerializedFile::ParseMetaData() {
   }
 
   std::shared_ptr<PoolBuffer> metadata_buffer =
-      AllocateBuffer(properties_.allocator(), metadata_len);
+      AllocateBuffer(properties_.memory_pool(), metadata_len);
   bytes_read =
       source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data());
   if (bytes_read != metadata_len) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 9f436ca..0d0fb07 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -45,7 +45,8 @@ static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
 class SerializedPageReader : public PageReader {
  public:
   SerializedPageReader(std::unique_ptr<InputStream> stream, int64_t num_rows,
-      Compression::type codec, MemoryAllocator* allocator = default_allocator());
+      Compression::type codec,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
   virtual ~SerializedPageReader() {}
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index d5b2dce..4e46b8e 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -54,7 +54,7 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
 
   std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
   return ColumnReader::Make(descr, std::move(page_reader),
-      const_cast<ReaderProperties*>(contents_->properties())->allocator());
+      const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
 }
 
 // Returns the rowgroup metadata
@@ -93,7 +93,7 @@ std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string
   if (memory_map) {
     std::shared_ptr<::arrow::io::ReadableFile> handle;
     PARQUET_THROW_NOT_OK(
-        ::arrow::io::ReadableFile::Open(path, props.allocator(), &handle));
+        ::arrow::io::ReadableFile::Open(path, props.memory_pool(), &handle));
     source = handle;
   } else {
     std::shared_ptr<::arrow::io::MemoryMappedFile> handle;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 8c1316b..a0a62b9 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -23,6 +23,8 @@
 #include "parquet/thrift.h"
 #include "parquet/util/memory.h"
 
+using arrow::MemoryPool;
+
 using parquet::schema::GroupNode;
 using parquet::schema::SchemaFlattener;
 
@@ -35,10 +37,10 @@ static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
 // SerializedPageWriter
 
 SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type codec,
-    ColumnChunkMetaDataBuilder* metadata, MemoryAllocator* allocator)
+    ColumnChunkMetaDataBuilder* metadata, MemoryPool* pool)
     : sink_(sink),
       metadata_(metadata),
-      allocator_(allocator),
+      pool_(pool),
       num_values_(0),
       dictionary_page_offset_(0),
       data_page_offset_(0),
@@ -78,7 +80,7 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress(
       compressor_->MaxCompressedLen(buffer->size(), buffer->data());
 
   std::shared_ptr<PoolBuffer> compression_buffer =
-      AllocateBuffer(allocator_, max_compressed_size);
+      AllocateBuffer(pool_, max_compressed_size);
 
   int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(),
       max_compressed_size, compression_buffer->mutable_data());
@@ -168,7 +170,7 @@ ColumnWriter* RowGroupSerializer::NextColumn() {
   const ColumnDescriptor* column_descr = col_meta->descr();
   std::unique_ptr<PageWriter> pager(
       new SerializedPageWriter(sink_, properties_->compression(column_descr->path()),
-          col_meta, properties_->allocator()));
+          col_meta, properties_->memory_pool()));
   current_column_writer_ =
       ColumnWriter::Make(col_meta, std::move(pager), num_rows_, properties_);
   return current_column_writer_.get();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 0140c5b..5bd00be 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -37,7 +37,7 @@ class SerializedPageWriter : public PageWriter {
  public:
   SerializedPageWriter(OutputStream* sink, Compression::type codec,
       ColumnChunkMetaDataBuilder* metadata,
-      MemoryAllocator* allocator = default_allocator());
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
   virtual ~SerializedPageWriter() {}
 
@@ -55,7 +55,7 @@ class SerializedPageWriter : public PageWriter {
  private:
   OutputStream* sink_;
   ColumnChunkMetaDataBuilder* metadata_;
-  MemoryAllocator* allocator_;
+  ::arrow::MemoryPool* pool_;
   int64_t num_values_;
   int64_t dictionary_page_offset_;
   int64_t data_page_offset_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/thrift.h
----------------------------------------------------------------------
diff --git a/src/parquet/thrift.h b/src/parquet/thrift.h
index aafd3f5..7fa0de3 100644
--- a/src/parquet/thrift.h
+++ b/src/parquet/thrift.h
@@ -99,7 +99,8 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali
   boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
       new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
   apache::thrift::protocol::TCompactProtocolFactoryT<
-      apache::thrift::transport::TMemoryBuffer> tproto_factory;
+      apache::thrift::transport::TMemoryBuffer>
+      tproto_factory;
   boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
       tproto_factory.getProtocol(tmem_transport);
   try {
@@ -121,7 +122,8 @@ inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
   boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
       new apache::thrift::transport::TMemoryBuffer(len));
   apache::thrift::protocol::TCompactProtocolFactoryT<
-      apache::thrift::transport::TMemoryBuffer> tproto_factory;
+      apache::thrift::transport::TMemoryBuffer>
+      tproto_factory;
   boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
       tproto_factory.getProtocol(mem_buffer);
   try {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/util/memory-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc
index 5225261..cdb6d21 100644
--- a/src/parquet/util/memory-test.cc
+++ b/src/parquet/util/memory-test.cc
@@ -27,52 +27,13 @@
 #include "parquet/util/memory.h"
 #include "parquet/util/test-common.h"
 
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+
 namespace parquet {
 
 class TestBuffer : public ::testing::Test {};
 
-TEST(TestAllocator, AllocateFree) {
-  TrackingAllocator allocator;
-
-  uint8_t* data;
-
-  ASSERT_TRUE(allocator.Allocate(100, &data).ok());
-  ASSERT_TRUE(nullptr != data);
-  data[99] = 55;
-  allocator.Free(data, 100);
-
-  ASSERT_TRUE(allocator.Allocate(0, &data).ok());
-  ASSERT_EQ(nullptr, data);
-  allocator.Free(data, 0);
-
-  int64_t to_alloc = std::numeric_limits<int64_t>::max();
-  ASSERT_FALSE(allocator.Allocate(to_alloc, &data).ok());
-}
-
-TEST(TestAllocator, TotalMax) {
-  TrackingAllocator allocator;
-  ASSERT_EQ(0, allocator.bytes_allocated());
-  ASSERT_EQ(0, allocator.max_memory());
-
-  uint8_t* data;
-  uint8_t* data2;
-  ASSERT_TRUE(allocator.Allocate(100, &data).ok());
-  ASSERT_EQ(100, allocator.bytes_allocated());
-  ASSERT_EQ(100, allocator.max_memory());
-
-  ASSERT_TRUE(allocator.Allocate(10, &data2).ok());
-  ASSERT_EQ(110, allocator.bytes_allocated());
-  ASSERT_EQ(110, allocator.max_memory());
-
-  allocator.Free(data, 100);
-  ASSERT_EQ(10, allocator.bytes_allocated());
-  ASSERT_EQ(110, allocator.max_memory());
-
-  allocator.Free(data2, 10);
-  ASSERT_EQ(0, allocator.bytes_allocated());
-  ASSERT_EQ(110, allocator.max_memory());
-}
-
 // Utility class to call private functions on MemPool.
 class ChunkedAllocatorTest {
  public:
@@ -294,7 +255,7 @@ TEST(TestBufferedInputStream, Basics) {
   int64_t stream_offset = 10;
   int64_t stream_size = source_size - stream_offset;
   int64_t chunk_size = 50;
-  std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_allocator(), source_size);
+  std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_memory_pool(), source_size);
   ASSERT_EQ(source_size, buf->size());
   for (int i = 0; i < source_size; i++) {
     buf->mutable_data()[i] = i;
@@ -303,9 +264,8 @@ TEST(TestBufferedInputStream, Basics) {
   auto wrapper =
       std::make_shared<ArrowInputFile>(std::make_shared<::arrow::io::BufferReader>(buf));
 
-  TrackingAllocator allocator;
   std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream(
-      &allocator, chunk_size, wrapper.get(), stream_offset, stream_size));
+      default_memory_pool(), chunk_size, wrapper.get(), stream_offset, stream_size));
 
   const uint8_t* output;
   int64_t bytes_read;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/util/memory.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
index 744dff9..021a346 100644
--- a/src/parquet/util/memory.cc
+++ b/src/parquet/util/memory.cc
@@ -29,47 +29,13 @@
 #include "parquet/util/bit-util.h"
 #include "parquet/util/logging.h"
 
-namespace parquet {
-
-::arrow::Status TrackingAllocator::Allocate(int64_t size, uint8_t** out) {
-  if (size == 0) {
-    *out = nullptr;
-    return ::arrow::Status::OK();
-  }
-  ARROW_RETURN_NOT_OK(allocator_->Allocate(size, out));
-  const int64_t total_memory = allocator_->bytes_allocated();
-  if (total_memory > max_memory_) { max_memory_ = total_memory; }
-  return ::arrow::Status::OK();
-}
-
-::arrow::Status TrackingAllocator::Reallocate(
-    int64_t old_size, int64_t new_size, uint8_t** out) {
-  ARROW_RETURN_NOT_OK(allocator_->Reallocate(old_size, new_size, out));
-  const int64_t total_memory = allocator_->bytes_allocated();
-  if (total_memory > max_memory_) { max_memory_ = total_memory; }
-  return ::arrow::Status::OK();
-}
-
-void TrackingAllocator::Free(uint8_t* p, int64_t size) {
-  allocator_->Free(p, size);
-}
-
-int64_t TrackingAllocator::max_memory() const {
-  return max_memory_.load();
-}
+using arrow::MemoryPool;
 
-int64_t TrackingAllocator::bytes_allocated() const {
-  return allocator_->bytes_allocated();
-}
-
-MemoryAllocator* default_allocator() {
-  static TrackingAllocator allocator;
-  return &allocator;
-}
+namespace parquet {
 
 template <class T>
-Vector<T>::Vector(int64_t size, MemoryAllocator* allocator)
-    : buffer_(AllocateUniqueBuffer(allocator, size * sizeof(T))),
+Vector<T>::Vector(int64_t size, MemoryPool* pool)
+    : buffer_(AllocateUniqueBuffer(pool, size * sizeof(T))),
       size_(size),
       capacity_(size) {
   if (size > 0) {
@@ -122,13 +88,13 @@ template class Vector<FixedLenByteArray>;
 const int ChunkedAllocator::INITIAL_CHUNK_SIZE;
 const int ChunkedAllocator::MAX_CHUNK_SIZE;
 
-ChunkedAllocator::ChunkedAllocator(MemoryAllocator* allocator)
+ChunkedAllocator::ChunkedAllocator(MemoryPool* pool)
     : current_chunk_idx_(-1),
       next_chunk_size_(INITIAL_CHUNK_SIZE),
       total_allocated_bytes_(0),
       peak_allocated_bytes_(0),
       total_reserved_bytes_(0),
-      allocator_(allocator) {}
+      pool_(pool) {}
 
 ChunkedAllocator::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf)
     : data(buf), size(size), allocated_bytes(0) {}
@@ -137,7 +103,7 @@ ChunkedAllocator::~ChunkedAllocator() {
   int64_t total_bytes_released = 0;
   for (size_t i = 0; i < chunks_.size(); ++i) {
     total_bytes_released += chunks_[i].size;
-    allocator_->Free(chunks_[i].data, chunks_[i].size);
+    pool_->Free(chunks_[i].data, chunks_[i].size);
   }
 
   DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
@@ -190,7 +156,7 @@ void ChunkedAllocator::FreeAll() {
   int64_t total_bytes_released = 0;
   for (size_t i = 0; i < chunks_.size(); ++i) {
     total_bytes_released += chunks_[i].size;
-    allocator_->Free(chunks_[i].data, chunks_[i].size);
+    pool_->Free(chunks_[i].data, chunks_[i].size);
   }
   chunks_.clear();
   next_chunk_size_ = INITIAL_CHUNK_SIZE;
@@ -229,7 +195,7 @@ bool ChunkedAllocator::FindChunk(int64_t min_size) {
 
     // Allocate a new chunk. Return early if malloc fails.
     uint8_t* buf = nullptr;
-    PARQUET_THROW_NOT_OK(allocator_->Allocate(chunk_size, &buf));
+    PARQUET_THROW_NOT_OK(pool_->Allocate(chunk_size, &buf));
     if (UNLIKELY(buf == NULL)) {
       DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
       current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
@@ -452,11 +418,10 @@ void InMemoryInputStream::Advance(int64_t num_bytes) {
 // ----------------------------------------------------------------------
 // In-memory output stream
 
-InMemoryOutputStream::InMemoryOutputStream(
-    MemoryAllocator* allocator, int64_t initial_capacity)
+InMemoryOutputStream::InMemoryOutputStream(MemoryPool* pool, int64_t initial_capacity)
     : size_(0), capacity_(initial_capacity) {
   if (initial_capacity == 0) { initial_capacity = kInMemoryDefaultCapacity; }
-  buffer_ = AllocateBuffer(allocator, initial_capacity);
+  buffer_ = AllocateBuffer(pool, initial_capacity);
 }
 
 InMemoryOutputStream::~InMemoryOutputStream() {}
@@ -492,7 +457,7 @@ std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() {
 // ----------------------------------------------------------------------
 // BufferedInputStream
 
-BufferedInputStream::BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
+BufferedInputStream::BufferedInputStream(MemoryPool* pool, int64_t buffer_size,
     RandomAccessSource* source, int64_t start, int64_t num_bytes)
     : source_(source), stream_offset_(start), stream_end_(start + num_bytes) {
   buffer_ = AllocateBuffer(pool, buffer_size);
@@ -534,15 +499,14 @@ void BufferedInputStream::Advance(int64_t num_bytes) {
   buffer_offset_ += num_bytes;
 }
 
-std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size) {
-  auto result = std::make_shared<PoolBuffer>(allocator);
+std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryPool* pool, int64_t size) {
+  auto result = std::make_shared<PoolBuffer>(pool);
   if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); }
   return result;
 }
 
-std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(
-    MemoryAllocator* allocator, int64_t size) {
-  std::unique_ptr<PoolBuffer> result(new PoolBuffer(allocator));
+std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(MemoryPool* pool, int64_t size) {
+  std::unique_ptr<PoolBuffer> result(new PoolBuffer(pool));
   if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); }
   return result;
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/util/memory.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index e82df95..e2e522d 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -66,31 +66,11 @@ using Buffer = ::arrow::Buffer;
 using MutableBuffer = ::arrow::MutableBuffer;
 using ResizableBuffer = ::arrow::ResizableBuffer;
 using PoolBuffer = ::arrow::PoolBuffer;
-using MemoryAllocator = ::arrow::MemoryPool;
-
-PARQUET_EXPORT MemoryAllocator* default_allocator();
-
-class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator {
- public:
-  explicit TrackingAllocator(MemoryAllocator* allocator = ::arrow::default_memory_pool())
-      : allocator_(allocator), max_memory_(0) {}
-
-  ::arrow::Status Allocate(int64_t size, uint8_t** out) override;
-  ::arrow::Status Reallocate(int64_t old_size, int64_t new_size, uint8_t** ptr) override;
-  void Free(uint8_t* p, int64_t size) override;
-
-  int64_t max_memory() const;
-  int64_t bytes_allocated() const override;
-
- private:
-  MemoryAllocator* allocator_;
-  std::atomic<int64_t> max_memory_;
-};
 
 template <class T>
 class Vector {
  public:
-  explicit Vector(int64_t size, MemoryAllocator* allocator);
+  explicit Vector(int64_t size, ::arrow::MemoryPool* pool);
   void Resize(int64_t new_size);
   void Reserve(int64_t new_capacity);
   void Assign(int64_t size, const T val);
@@ -149,7 +129,7 @@ class Vector {
 
 class ChunkedAllocator {
  public:
-  explicit ChunkedAllocator(MemoryAllocator* allocator = default_allocator());
+  explicit ChunkedAllocator(::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
 
   /// Frees all chunks of memory and subtracts the total allocated bytes
   /// from the registered limits.
@@ -227,7 +207,7 @@ class ChunkedAllocator {
 
   std::vector<ChunkInfo> chunks_;
 
-  MemoryAllocator* allocator_;
+  ::arrow::MemoryPool* pool_;
 
   /// Find or allocated a chunk with at least min_size spare capacity and update
   /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
@@ -347,7 +327,8 @@ class PARQUET_EXPORT ArrowOutputStream : public ArrowFileMethods, public OutputS
 
 class PARQUET_EXPORT InMemoryOutputStream : public OutputStream {
  public:
-  explicit InMemoryOutputStream(MemoryAllocator* allocator = default_allocator(),
+  explicit InMemoryOutputStream(
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
       int64_t initial_capacity = kInMemoryDefaultCapacity);
 
   virtual ~InMemoryOutputStream();
@@ -421,7 +402,7 @@ class InMemoryInputStream : public InputStream {
 // Implementation of an InputStream when only some of the bytes are in memory.
 class BufferedInputStream : public InputStream {
  public:
-  BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
+  BufferedInputStream(::arrow::MemoryPool* pool, int64_t buffer_size,
       RandomAccessSource* source, int64_t start, int64_t end);
   virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
   virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
@@ -437,10 +418,10 @@ class BufferedInputStream : public InputStream {
   int64_t buffer_size_;
 };
 
-std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size = 0);
+std::shared_ptr<PoolBuffer> AllocateBuffer(::arrow::MemoryPool* pool, int64_t size = 0);
 
 std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(
-    MemoryAllocator* allocator, int64_t size = 0);
+    ::arrow::MemoryPool* pool, int64_t size = 0);
 
 }  // namespace parquet
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e59bc5c/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
index edadb53..2327aeb 100644
--- a/src/parquet/util/test-common.h
+++ b/src/parquet/util/test-common.h
@@ -32,7 +32,8 @@ namespace parquet {
 namespace test {
 
 typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type, FloatType,
-    DoubleType, ByteArrayType, FLBAType> ParquetTypes;
+    DoubleType, ByteArrayType, FLBAType>
+    ParquetTypes;
 
 template <typename T>
 static inline void assert_vector_equal(const vector<T>& left, const vector<T>& right) {


Mime
View raw message