parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-666: Add support for writing dictionaries
Date Sun, 28 Aug 2016 22:39:48 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master c2e5f08cb -> 0a0323d85


PARQUET-666: Add support for writing dictionaries

Working version but still some minor stuff to clean up (but I'd be happy for comments).

Had to change the Encoder interface to make it work with dictionary encoding. Also dictionary encoded RowGroups are currently limited to a single DataPage.

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #142 from xhochy/parquet-666 and squashes the following commits:

95e4753 [Uwe L. Korn] Fix PARQUET-591 and get make more use of WriterProperties
8f5b2bb [Uwe L. Korn] Comment out unused private variable
32adfd9 [Uwe L. Korn] Remove compression code duplication
6804e2a [Uwe L. Korn] Remove deprecated TODO
0d58ef2 [Uwe L. Korn] Pass DictionaryPage objects to the page writer
95fdccd [Uwe L. Korn] Get rid of temporarly created struct
cce17f9 [Uwe L. Korn] Store serialised pages
2841494 [Uwe L. Korn] Remove unsigned/signed comparison
7ce98ca [Uwe L. Korn] Store pages in RAM until the end of the column
17533bf [Uwe L. Korn] Remove code duplication
a3b4043 [Uwe L. Korn] PARQUET-666: Add support for writing dictionaries


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/0a0323d8
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/0a0323d8
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/0a0323d8

Branch: refs/heads/master
Commit: 0a0323d850a00f168b5e6e2b51df607d63323846
Parents: c2e5f08
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Sun Aug 28 18:39:40 2016 -0400
Committer: Wes McKinney <wesm@apache.org>
Committed: Sun Aug 28 18:39:40 2016 -0400

----------------------------------------------------------------------
 src/parquet/column/column-io-benchmark.cc   |  12 +-
 src/parquet/column/column-writer-test.cc    |  20 ++-
 src/parquet/column/page.h                   |  11 +-
 src/parquet/column/properties-test.cc       |   1 -
 src/parquet/column/properties.h             |  48 +++----
 src/parquet/column/test-util.h              |  26 ++--
 src/parquet/column/writer.cc                |  95 +++++++++----
 src/parquet/column/writer.h                 |  41 ++++--
 src/parquet/encodings/dictionary-encoding.h | 161 +++++++++++++----------
 src/parquet/encodings/encoder.h             |   6 +-
 src/parquet/encodings/encoding-benchmark.cc |  20 ++-
 src/parquet/encodings/encoding-test.cc      |  35 ++---
 src/parquet/encodings/plain-encoding.h      |  62 +++++++--
 src/parquet/file/writer-internal.cc         |  99 ++++++++------
 src/parquet/file/writer-internal.h          |  23 ++--
 src/parquet/util/mem-pool.cc                |  35 +++++
 src/parquet/util/mem-pool.h                 |  33 +----
 17 files changed, 428 insertions(+), 300 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/column/column-io-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-io-benchmark.cc b/src/parquet/column/column-io-benchmark.cc
index 10272b2..74d7349 100644
--- a/src/parquet/column/column-io-benchmark.cc
+++ b/src/parquet/column/column-io-benchmark.cc
@@ -31,11 +31,11 @@ using schema::PrimitiveNode;
 namespace benchmark {
 
 std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size, OutputStream* dst,
-    ColumnChunk* metadata, ColumnDescriptor* schema) {
+    ColumnChunk* metadata, ColumnDescriptor* schema, const WriterProperties* properties) {
   std::unique_ptr<SerializedPageWriter> pager(
       new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata));
-  return std::unique_ptr<Int64Writer>(
-      new Int64Writer(schema, std::move(pager), output_size, Encoding::PLAIN));
+  return std::unique_ptr<Int64Writer>(new Int64Writer(
+      schema, std::move(pager), output_size, Encoding::PLAIN, properties));
 }
 
 std::shared_ptr<ColumnDescriptor> Int64Schema(Repetition::type repetition) {
@@ -62,11 +62,12 @@ static void BM_WriteInt64Column(::benchmark::State& state) {
   std::vector<int16_t> definition_levels(state.range_x(), 1);
   std::vector<int16_t> repetition_levels(state.range_x(), 0);
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
+  std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties();
 
   while (state.KeepRunning()) {
     InMemoryOutputStream dst;
     std::unique_ptr<Int64Writer> writer =
-        BuildWriter(state.range_x(), &dst, &metadata, schema.get());
+        BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get());
     writer->WriteBatch(
         values.size(), definition_levels.data(), repetition_levels.data(), values.data());
     writer->Close();
@@ -97,8 +98,9 @@ static void BM_ReadInt64Column(::benchmark::State& state) {
   std::shared_ptr<ColumnDescriptor> schema = Int64Schema(repetition);
 
   InMemoryOutputStream dst;
+  std::shared_ptr<parquet::WriterProperties> properties = default_writer_properties();
   std::unique_ptr<Int64Writer> writer =
-      BuildWriter(state.range_x(), &dst, &metadata, schema.get());
+      BuildWriter(state.range_x(), &dst, &metadata, schema.get(), properties.get());
   writer->WriteBatch(
       values.size(), definition_levels.data(), repetition_levels.data(), values.data());
   writer->Close();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index f78ab5a..f2c9c64 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -69,6 +69,7 @@ class TestPrimitiveWriter : public ::testing::Test {
 
   void SetUp() {
     SetupValuesOut();
+    writer_properties_ = default_writer_properties();
     definition_levels_out_.resize(SMALL_SIZE);
     repetition_levels_out_.resize(SMALL_SIZE);
 
@@ -88,8 +89,9 @@ class TestPrimitiveWriter : public ::testing::Test {
     sink_.reset(new InMemoryOutputStream());
     std::unique_ptr<SerializedPageWriter> pager(
         new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_));
-    return std::unique_ptr<TypedColumnWriter<TestType>>(new TypedColumnWriter<TestType>(
-        schema_.get(), std::move(pager), output_size, encoding));
+    return std::unique_ptr<TypedColumnWriter<TestType>>(
+        new TypedColumnWriter<TestType>(schema_.get(), std::move(pager), output_size,
+            encoding, writer_properties_.get()));
   }
 
   void SyncValuesOut();
@@ -139,6 +141,7 @@ class TestPrimitiveWriter : public ::testing::Test {
   format::ColumnChunk metadata_;
   std::shared_ptr<ColumnDescriptor> schema_;
   std::unique_ptr<InMemoryOutputStream> sink_;
+  std::shared_ptr<WriterProperties> writer_properties_;
 };
 
 template <typename TestType>
@@ -186,15 +189,24 @@ typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
 
 TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
 
+// Dictionary encoding for booleans is not supported.
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+    ByteArrayType, FLBAType> TestDictionaryTypes;
+
+template <typename T>
+class TestPrimitiveDictionaryWriter : public TestPrimitiveWriter<T> {};
+
+TYPED_TEST_CASE(TestPrimitiveDictionaryWriter, TestDictionaryTypes);
+
 TYPED_TEST(TestPrimitiveWriter, RequiredPlain) {
   this->TestRequiredWithEncoding(Encoding::PLAIN);
 }
 
-/*
-TYPED_TEST(TestPrimitiveWriter, RequiredDictionary) {
+TYPED_TEST(TestPrimitiveDictionaryWriter, RequiredDictionary) {
   this->TestRequiredWithEncoding(Encoding::PLAIN_DICTIONARY);
 }
 
+/*
 TYPED_TEST(TestPrimitiveWriter, RequiredRLE) {
   this->TestRequiredWithEncoding(Encoding::RLE);
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index 8bad57f..13dec2c 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -45,6 +45,8 @@ class Page {
 
   PageType::type type() const { return type_; }
 
+  std::shared_ptr<Buffer> buffer() const { return buffer_; }
+
   // @returns: a pointer to the page's data
   const uint8_t* data() const { return buffer_->data(); }
 
@@ -171,12 +173,9 @@ class PageWriter {
 
   virtual void Close() = 0;
 
-  virtual int64_t WriteDataPage(int32_t num_rows, int32_t num_values,
-      const std::shared_ptr<Buffer>& definition_levels,
-      Encoding::type definition_level_encoding,
-      const std::shared_ptr<Buffer>& repetition_levels,
-      Encoding::type repetition_level_encoding, const std::shared_ptr<Buffer>& values,
-      Encoding::type encoding) = 0;
+  virtual int64_t WriteDataPage(const DataPage& page) = 0;
+
+  virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
 };
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/column/properties-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties-test.cc b/src/parquet/column/properties-test.cc
index 13a155a..f1eeaf3 100644
--- a/src/parquet/column/properties-test.cc
+++ b/src/parquet/column/properties-test.cc
@@ -38,7 +38,6 @@ TEST(TestWriterProperties, Basics) {
 
   ASSERT_EQ(DEFAULT_PAGE_SIZE, props->data_pagesize());
   ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props->dictionary_pagesize());
-  ASSERT_EQ(DEFAULT_IS_DICTIONARY_ENABLED, props->dictionary_enabled());
   ASSERT_EQ(DEFAULT_WRITER_VERSION, props->version());
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
index 848cf59..78478ad 100644
--- a/src/parquet/column/properties.h
+++ b/src/parquet/column/properties.h
@@ -77,9 +77,8 @@ class PARQUET_EXPORT ReaderProperties {
 
 ReaderProperties PARQUET_EXPORT default_reader_properties();
 
-static int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
+static int64_t DEFAULT_PAGE_SIZE = 64 * 1024 * 1024;
 static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
-static bool DEFAULT_IS_DICTIONARY_ENABLED = true;
 static Encoding::type DEFAULT_ENCODING = Encoding::PLAIN;
 static constexpr ParquetVersion::type DEFAULT_WRITER_VERSION =
     ParquetVersion::PARQUET_1_0;
@@ -93,7 +92,6 @@ class PARQUET_EXPORT WriterProperties {
    public:
     Builder()
         : allocator_(default_allocator()),
-          dictionary_enabled_(DEFAULT_IS_DICTIONARY_ENABLED),
           dictionary_pagesize_(DEFAULT_DICTIONARY_PAGE_SIZE),
           default_encoding_(DEFAULT_ENCODING),
           pagesize_(DEFAULT_PAGE_SIZE),
@@ -116,16 +114,6 @@ class PARQUET_EXPORT WriterProperties {
       return this;
     }
 
-    Builder* enable_dictionary() {
-      dictionary_enabled_ = true;
-      return this;
-    }
-
-    Builder* disable_dictionary() {
-      dictionary_enabled_ = false;
-      return this;
-    }
-
     Builder* encoding(
         const std::shared_ptr<schema::ColumnPath>& path, Encoding::type encoding_type) {
       return encoding(path->ToDotString(), encoding_type);
@@ -162,14 +150,13 @@ class PARQUET_EXPORT WriterProperties {
     }
 
     std::shared_ptr<WriterProperties> build() {
-      return std::shared_ptr<WriterProperties>(new WriterProperties(allocator_,
-          dictionary_enabled_, dictionary_pagesize_, default_encoding_, encodings_,
-          pagesize_, version_, default_codec_, codecs_));
+      return std::shared_ptr<WriterProperties>(
+          new WriterProperties(allocator_, dictionary_pagesize_, default_encoding_,
+              encodings_, pagesize_, version_, default_codec_, codecs_));
     }
 
    private:
     MemoryAllocator* allocator_;
-    bool dictionary_enabled_;
     int64_t dictionary_pagesize_;
     // Encoding used for each column if not a specialized one is defined as
     // part of encodings_
@@ -185,8 +172,6 @@ class PARQUET_EXPORT WriterProperties {
 
   MemoryAllocator* allocator() const { return allocator_; }
 
-  bool dictionary_enabled() const { return dictionary_enabled_; }
-
   int64_t dictionary_pagesize() const { return dictionary_pagesize_; }
 
   int64_t data_pagesize() const { return pagesize_; }
@@ -194,9 +179,19 @@ class PARQUET_EXPORT WriterProperties {
   ParquetVersion::type version() const { return parquet_version_; }
 
   Encoding::type encoding(const std::shared_ptr<schema::ColumnPath>& path) const {
+    Encoding::type coding = default_encoding_;
     auto it = encodings_.find(path->ToDotString());
-    if (it != encodings_.end()) { return it->second; }
-    return default_encoding_;
+    if (it != encodings_.end()) { coding = it->second; }
+
+    // Use the correct enum value for dictionary coding based on the used Parquet version
+    if (coding == Encoding::PLAIN_DICTIONARY || coding == Encoding::RLE_DICTIONARY) {
+      if (parquet_version_ == ParquetVersion::PARQUET_1_0) {
+        return Encoding::PLAIN_DICTIONARY;
+      } else {
+        return Encoding::RLE_DICTIONARY;
+      }
+    }
+    return coding;
   }
 
   Compression::type compression(const std::shared_ptr<schema::ColumnPath>& path) const {
@@ -206,26 +201,21 @@ class PARQUET_EXPORT WriterProperties {
   }
 
  private:
-  explicit WriterProperties(MemoryAllocator* allocator, bool dictionary_enabled,
-      int64_t dictionary_pagesize, Encoding::type default_encoding,
+  explicit WriterProperties(MemoryAllocator* allocator, int64_t dictionary_pagesize,
+      Encoding::type default_encoding,
       const std::unordered_map<std::string, Encoding::type>& encodings, int64_t pagesize,
       ParquetVersion::type version, Compression::type default_codec,
       const ColumnCodecs& codecs)
       : allocator_(allocator),
-        dictionary_enabled_(dictionary_enabled),
         dictionary_pagesize_(dictionary_pagesize),
         default_encoding_(default_encoding),
         encodings_(encodings),
         pagesize_(pagesize),
         parquet_version_(version),
         default_codec_(default_codec),
-        codecs_(codecs) {
-    pagesize_ = DEFAULT_PAGE_SIZE;
-    dictionary_enabled_ = DEFAULT_IS_DICTIONARY_ENABLED;
-  }
+        codecs_(codecs) {}
 
   MemoryAllocator* allocator_;
-  bool dictionary_enabled_;
   int64_t dictionary_pagesize_;
   Encoding::type default_encoding_;
   std::unordered_map<std::string, Encoding::type> encodings_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index ba24fb2..b11e620 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -132,7 +132,9 @@ class DataPageBuilder {
   void AppendValues(const ColumnDescriptor* d, const vector<T>& values,
       Encoding::type encoding = Encoding::PLAIN) {
     PlainEncoder<Type> encoder(d);
-    encoder.Encode(&values[0], values.size(), sink_);
+    encoder.Put(&values[0], values.size());
+    std::shared_ptr<Buffer> values_sink = encoder.FlushValues();
+    sink_->Write(values_sink->data(), values_sink->size());
 
     num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
     encoding_ = encoding;
@@ -191,7 +193,9 @@ void DataPageBuilder<BooleanType>::AppendValues(
     ParquetException::NYI("only plain encoding currently implemented");
   }
   PlainEncoder<BooleanType> encoder(d);
-  encoder.Encode(values, values.size(), sink_);
+  encoder.Put(values, values.size());
+  std::shared_ptr<Buffer> buffer = encoder.FlushValues();
+  sink_->Write(buffer->data(), buffer->size());
 
   num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
   encoding_ = encoding;
@@ -234,9 +238,7 @@ class DictionaryPageBuilder {
   // This class writes data and metadata to the passed inputs
   explicit DictionaryPageBuilder(const ColumnDescriptor* d)
       : num_dict_values_(0), have_values_(false) {
-    int type_length = 0;
-    if (TN == Type::FIXED_LEN_BYTE_ARRAY) { type_length = d->type_length(); }
-    encoder_.reset(new DictEncoder<TC>(&pool_, default_allocator(), type_length));
+    encoder_.reset(new DictEncoder<TYPE>(d, &pool_));
   }
 
   ~DictionaryPageBuilder() { pool_.FreeAll(); }
@@ -244,18 +246,10 @@ class DictionaryPageBuilder {
   shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
     int num_values = values.size();
     // Dictionary encoding
-    for (int i = 0; i < num_values; ++i) {
-      encoder_->Put(values[i]);
-    }
+    encoder_->Put(values.data(), num_values);
     num_dict_values_ = encoder_->num_entries();
     have_values_ = true;
-    shared_ptr<OwnedMutableBuffer> rle_indices = std::make_shared<OwnedMutableBuffer>(
-        sizeof(int) * encoder_->EstimatedDataEncodedSize());
-    int actual_bytes =
-        encoder_->WriteIndices(rle_indices->mutable_data(), rle_indices->size());
-    rle_indices->Resize(actual_bytes);
-    encoder_->ClearIndices();
-    return rle_indices;
+    return encoder_->FlushValues();
   }
 
   shared_ptr<Buffer> WriteDict() {
@@ -269,7 +263,7 @@ class DictionaryPageBuilder {
 
  private:
   MemPool pool_;
-  shared_ptr<DictEncoder<TC>> encoder_;
+  shared_ptr<DictEncoder<TYPE>> encoder_;
   int32_t num_dict_values_;
   bool have_values_;
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 482265e..7845c58 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -18,6 +18,7 @@
 #include "parquet/column/writer.h"
 
 #include "parquet/column/properties.h"
+#include "parquet/encodings/dictionary-encoding.h"
 #include "parquet/encodings/plain-encoding.h"
 
 namespace parquet {
@@ -32,11 +33,16 @@ std::shared_ptr<WriterProperties> default_writer_properties() {
 }
 
 ColumnWriter::ColumnWriter(const ColumnDescriptor* descr,
-    std::unique_ptr<PageWriter> pager, int64_t expected_rows, MemoryAllocator* allocator)
+    std::unique_ptr<PageWriter> pager, int64_t expected_rows, bool has_dictionary,
+    Encoding::type encoding, const WriterProperties* properties)
     : descr_(descr),
       pager_(std::move(pager)),
       expected_rows_(expected_rows),
-      allocator_(allocator),
+      has_dictionary_(has_dictionary),
+      encoding_(encoding),
+      properties_(properties),
+      allocator_(properties->allocator()),
+      pool_(properties->allocator()),
       num_buffered_values_(0),
       num_buffered_encoded_values_(0),
       num_rows_(0),
@@ -47,7 +53,6 @@ ColumnWriter::ColumnWriter(const ColumnDescriptor* descr,
 void ColumnWriter::InitSinks() {
   definition_levels_sink_.reset(new InMemoryOutputStream());
   repetition_levels_sink_.reset(new InMemoryOutputStream());
-  values_sink_.reset(new InMemoryOutputStream());
 }
 
 void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
@@ -80,11 +85,10 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels(
   return std::static_pointer_cast<Buffer>(buffer_rle);
 }
 
-void ColumnWriter::WriteNewPage() {
-  // TODO: Currently we only support writing DataPages
+void ColumnWriter::AddDataPage() {
   std::shared_ptr<Buffer> definition_levels = definition_levels_sink_->GetBuffer();
   std::shared_ptr<Buffer> repetition_levels = repetition_levels_sink_->GetBuffer();
-  std::shared_ptr<Buffer> values = values_sink_->GetBuffer();
+  std::shared_ptr<Buffer> values = GetValuesBuffer();
 
   if (descr_->max_definition_level() > 0) {
     definition_levels =
@@ -96,11 +100,22 @@ void ColumnWriter::WriteNewPage() {
         RleEncodeLevels(repetition_levels, descr_->max_repetition_level());
   }
 
-  // TODO(PARQUET-590): Encodings are hard-coded
-  int64_t bytes_written = pager_->WriteDataPage(num_buffered_values_,
-      num_buffered_encoded_values_, definition_levels, Encoding::RLE, repetition_levels,
-      Encoding::RLE, values, Encoding::PLAIN);
-  total_bytes_written_ += bytes_written;
+  int64_t uncompressed_size =
+      definition_levels->size() + repetition_levels->size() + values->size();
+
+  // Concatenate data into a single buffer
+  std::shared_ptr<OwnedMutableBuffer> uncompressed_data =
+      std::make_shared<OwnedMutableBuffer>(uncompressed_size, allocator_);
+  uint8_t* uncompressed_ptr = uncompressed_data->mutable_data();
+  memcpy(uncompressed_ptr, repetition_levels->data(), repetition_levels->size());
+  uncompressed_ptr += repetition_levels->size();
+  memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size());
+  uncompressed_ptr += definition_levels->size();
+  memcpy(uncompressed_ptr, values->data(), values->size());
+  DataPage page(
+      uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE);
+
+  data_pages_.push_back(std::move(page));
 
   // Re-initialize the sinks as GetBuffer made them invalid.
   InitSinks();
@@ -108,9 +123,19 @@ void ColumnWriter::WriteNewPage() {
   num_buffered_encoded_values_ = 0;
 }
 
+void ColumnWriter::WriteDataPage(const DataPage& page) {
+  int64_t bytes_written = pager_->WriteDataPage(page);
+  total_bytes_written_ += bytes_written;
+}
+
 int64_t ColumnWriter::Close() {
+  if (has_dictionary_) { WriteDictionaryPage(); }
   // Write all outstanding data to a new page
-  if (num_buffered_values_ > 0) { WriteNewPage(); }
+  if (num_buffered_values_ > 0) { AddDataPage(); }
+
+  for (size_t i = 0; i < data_pages_.size(); i++) {
+    WriteDataPage(data_pages_[i]);
+  }
 
   if (num_rows_ != expected_rows_) {
     throw ParquetException(
@@ -129,18 +154,40 @@ int64_t ColumnWriter::Close() {
 template <typename Type>
 TypedColumnWriter<Type>::TypedColumnWriter(const ColumnDescriptor* schema,
     std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
-    MemoryAllocator* allocator)
-    : ColumnWriter(schema, std::move(pager), expected_rows, allocator) {
+    const WriterProperties* properties)
+    : ColumnWriter(schema, std::move(pager), expected_rows,
+          (encoding == Encoding::PLAIN_DICTIONARY ||
+                       encoding == Encoding::RLE_DICTIONARY),
+          encoding, properties) {
   switch (encoding) {
     case Encoding::PLAIN:
-      current_encoder_ =
-          std::unique_ptr<EncoderType>(new PlainEncoder<Type>(schema, allocator));
+      current_encoder_ = std::unique_ptr<EncoderType>(
+          new PlainEncoder<Type>(schema, properties->allocator()));
+      break;
+    case Encoding::PLAIN_DICTIONARY:
+    case Encoding::RLE_DICTIONARY:
+      current_encoder_ = std::unique_ptr<EncoderType>(
+          new DictEncoder<Type>(schema, &pool_, properties->allocator()));
       break;
     default:
       ParquetException::NYI("Selected encoding is not supported");
   }
 }
 
+template <typename Type>
+void TypedColumnWriter<Type>::WriteDictionaryPage() {
+  auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
+  auto buffer = std::make_shared<OwnedMutableBuffer>(dict_encoder->dict_encoded_size());
+  dict_encoder->WriteDict(buffer->mutable_data());
+  // TODO Get rid of this deep call
+  dict_encoder->mem_pool()->FreeAll();
+
+  Encoding::type dict_encoding = Encoding::PLAIN_DICTIONARY;
+  if (encoding_ == Encoding::RLE_DICTIONARY) { dict_encoding = Encoding::PLAIN; }
+  DictionaryPage page(buffer, dict_encoder->num_entries(), dict_encoding);
+  total_bytes_written_ += pager_->WriteDictionaryPage(page);
+}
+
 // ----------------------------------------------------------------------
 // Dynamic column writer constructor
 
@@ -151,28 +198,28 @@ std::shared_ptr<ColumnWriter> ColumnWriter::Make(const ColumnDescriptor* descr,
   switch (descr->physical_type()) {
     case Type::BOOLEAN:
       return std::make_shared<BoolWriter>(
-          descr, std::move(pager), expected_rows, encoding, properties->allocator());
+          descr, std::move(pager), expected_rows, encoding, properties);
     case Type::INT32:
       return std::make_shared<Int32Writer>(
-          descr, std::move(pager), expected_rows, encoding, properties->allocator());
+          descr, std::move(pager), expected_rows, encoding, properties);
     case Type::INT64:
       return std::make_shared<Int64Writer>(
-          descr, std::move(pager), expected_rows, encoding, properties->allocator());
+          descr, std::move(pager), expected_rows, encoding, properties);
     case Type::INT96:
       return std::make_shared<Int96Writer>(
-          descr, std::move(pager), expected_rows, encoding, properties->allocator());
+          descr, std::move(pager), expected_rows, encoding, properties);
     case Type::FLOAT:
       return std::make_shared<FloatWriter>(
-          descr, std::move(pager), expected_rows, encoding, properties->allocator());
+          descr, std::move(pager), expected_rows, encoding, properties);
     case Type::DOUBLE:
       return std::make_shared<DoubleWriter>(
-          descr, std::move(pager), expected_rows, encoding, properties->allocator());
+          descr, std::move(pager), expected_rows, encoding, properties);
     case Type::BYTE_ARRAY:
       return std::make_shared<ByteArrayWriter>(
-          descr, std::move(pager), expected_rows, encoding, properties->allocator());
+          descr, std::move(pager), expected_rows, encoding, properties);
     case Type::FIXED_LEN_BYTE_ARRAY:
       return std::make_shared<FixedLenByteArrayWriter>(
-          descr, std::move(pager), expected_rows, encoding, properties->allocator());
+          descr, std::move(pager), expected_rows, encoding, properties);
     default:
       ParquetException::NYI("type reader not implemented");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index be441b8..f7834f5 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -18,6 +18,8 @@
 #ifndef PARQUET_COLUMN_WRITER_H
 #define PARQUET_COLUMN_WRITER_H
 
+#include <vector>
+
 #include "parquet/column/levels.h"
 #include "parquet/column/page.h"
 #include "parquet/column/properties.h"
@@ -25,6 +27,7 @@
 #include "parquet/schema/descriptor.h"
 #include "parquet/types.h"
 #include "parquet/util/mem-allocator.h"
+#include "parquet/util/mem-pool.h"
 #include "parquet/util/output.h"
 #include "parquet/util/visibility.h"
 
@@ -33,7 +36,8 @@ namespace parquet {
 class PARQUET_EXPORT ColumnWriter {
  public:
   ColumnWriter(const ColumnDescriptor*, std::unique_ptr<PageWriter>,
-      int64_t expected_rows, MemoryAllocator* allocator = default_allocator());
+      int64_t expected_rows, bool has_dictionary, Encoding::type encoding,
+      const WriterProperties* properties);
 
   static std::shared_ptr<ColumnWriter> Make(const ColumnDescriptor*,
       std::unique_ptr<PageWriter>, int64_t expected_rows,
@@ -51,7 +55,11 @@ class PARQUET_EXPORT ColumnWriter {
   int64_t Close();
 
  protected:
-  void WriteNewPage();
+  virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
+  virtual void WriteDictionaryPage() = 0;
+
+  void AddDataPage();
+  void WriteDataPage(const DataPage& page);
 
   // Write multiple definition levels
   void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
@@ -68,10 +76,14 @@ class PARQUET_EXPORT ColumnWriter {
 
   // The number of rows that should be written in this column chunk.
   int64_t expected_rows_;
+  bool has_dictionary_;
+  Encoding::type encoding_;
+  const WriterProperties* properties_;
 
   LevelEncoder level_encoder_;
 
   MemoryAllocator* allocator_;
+  MemPool pool_;
 
   // The total number of values stored in the data page. This is the maximum of
   // the number of encoded definition levels or encoded values. For
@@ -92,10 +104,11 @@ class PARQUET_EXPORT ColumnWriter {
 
   std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
   std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
-  std::unique_ptr<InMemoryOutputStream> values_sink_;
 
  private:
   void InitSinks();
+
+  std::vector<DataPage> data_pages_;
 };
 
 // API to write values to a single column. This is the main client facing API.
@@ -105,14 +118,19 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
   typedef typename DType::c_type T;
 
   TypedColumnWriter(const ColumnDescriptor* schema, std::unique_ptr<PageWriter> pager,
-      int64_t expected_rows, Encoding::type encoding,
-      MemoryAllocator* allocator = default_allocator());
+      int64_t expected_rows, Encoding::type encoding, const WriterProperties* properties);
 
   // Write a batch of repetition levels, definition levels, and values to the
   // column.
   void WriteBatch(int64_t num_values, const int16_t* def_levels,
       const int16_t* rep_levels, const T* values);
 
+ protected:
+  std::shared_ptr<Buffer> GetValuesBuffer() override {
+    return current_encoder_->FlushValues();
+  }
+  void WriteDictionaryPage() override;
+
  private:
   typedef Encoder<DType> EncoderType;
 
@@ -124,15 +142,9 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
   // plain-encoded data.
   std::unordered_map<int, std::shared_ptr<EncoderType>> encoders_;
 
-  void ConfigureDictionary(const DictionaryPage* page);
-
   std::unique_ptr<EncoderType> current_encoder_;
 };
 
-// TODO(PARQUET-591): This is just chosen at random, we should make better estimates.
-// See also: parquet-column/../column/impl/ColumnWriteStoreV2.java:sizeCheck
-const int64_t PAGE_VALUE_COUNT = 1000;
-
 template <typename DType>
 inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
     const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
@@ -173,13 +185,14 @@ inline void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
   num_buffered_values_ += num_values;
   num_buffered_encoded_values_ += values_to_write;
 
-  // TODO(PARQUET-591): Instead of rows as a boundary, do a size check
-  if (num_buffered_values_ >= PAGE_VALUE_COUNT) { WriteNewPage(); }
+  if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
+    AddDataPage();
+  }
 }
 
 template <typename DType>
 void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
-  current_encoder_->Encode(values, num_values, values_sink_.get());
+  current_encoder_->Put(values, num_values);
 }
 
 typedef TypedColumnWriter<BooleanType> BoolWriter;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 8e121ee..465a0e4 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -153,23 +153,36 @@ static constexpr double MAX_HASH_LOAD = 0.7;
 /// the dictionary is being constructed. At any time, the buffered values can be
 /// written out with the current dictionary size. More values can then be added to
 /// the encoder, including new dictionary entries.
-class DictEncoderBase {
+template <typename DType>
+class DictEncoder : public Encoder<DType> {
  public:
-  virtual ~DictEncoderBase() { DCHECK(buffered_indices_.empty()); }
+  typedef typename DType::c_type T;
 
-  /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
-  /// dict_encoded_size() bytes.
-  virtual void WriteDict(uint8_t* buffer) = 0;
+  explicit DictEncoder(const ColumnDescriptor* desc, MemPool* pool = nullptr,
+      MemoryAllocator* allocator = default_allocator())
+      : Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator),
+        allocator_(allocator),
+        pool_(pool),
+        hash_table_size_(INITIAL_HASH_TABLE_SIZE),
+        mod_bitmask_(hash_table_size_ - 1),
+        hash_slots_(0, allocator),
+        dict_encoded_size_(0),
+        type_length_(desc->type_length()) {
+    hash_slots_.Assign(hash_table_size_, HASH_SLOT_EMPTY);
+    if (!CpuInfo::initialized()) { CpuInfo::Init(); }
+  }
 
-  /// The number of entries in the dictionary.
-  virtual int num_entries() const = 0;
+  virtual ~DictEncoder() { DCHECK(buffered_indices_.empty()); }
 
-  /// Clears all the indices (but leaves the dictionary).
-  void ClearIndices() { buffered_indices_.clear(); }
+  // TODO(wesm): think about how to address the construction semantics in
+  // encodings/dictionary-encoding.h
+  void set_mem_pool(MemPool* pool) { pool_ = pool; }
+
+  void set_type_length(int type_length) { type_length_ = type_length; }
 
   /// Returns a conservative estimate of the number of bytes needed to encode the buffered
   /// indices. Used to size the buffer passed to WriteIndices().
-  int EstimatedDataEncodedSize() {
+  int64_t EstimatedDataEncodedSize() override {
     // Note: because of the way RleEncoder::CheckBufferFull() is called, we have to
     // reserve
     // an extra "RleEncoder::MinBufferSize" bytes. These extra bytes won't be used
@@ -194,19 +207,43 @@ class DictEncoderBase {
 
   int hash_table_size() { return hash_table_size_; }
   int dict_encoded_size() { return dict_encoded_size_; }
+  /// Clears all the indices (but leaves the dictionary).
+  void ClearIndices() { buffered_indices_.clear(); }
 
- protected:
-  explicit DictEncoderBase(MemPool* pool, MemoryAllocator* allocator)
-      : hash_table_size_(INITIAL_HASH_TABLE_SIZE),
-        mod_bitmask_(hash_table_size_ - 1),
-        hash_slots_(0, allocator),
-        allocator_(allocator),
-        pool_(pool),
-        dict_encoded_size_(0) {
-    hash_slots_.Assign(hash_table_size_, HASH_SLOT_EMPTY);
-    if (!CpuInfo::initialized()) { CpuInfo::Init(); }
+  /// Encode value. Note that this does not actually write any data, just
+  /// buffers the value's index to be written later.
+  void Put(const T& value);
+
+  std::shared_ptr<Buffer> FlushValues() override {
+    auto buffer = std::make_shared<OwnedMutableBuffer>(
+        EstimatedDataEncodedSize(), this->allocator_);
+    int result_size = WriteIndices(buffer->mutable_data(), EstimatedDataEncodedSize());
+    ClearIndices();
+    buffer->Resize(result_size);
+    return buffer;
+  };
+
+  void Put(const T* values, int num_values) override {
+    for (int i = 0; i < num_values; i++) {
+      Put(values[i]);
+    }
   }
 
+  /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
+  /// dict_encoded_size() bytes.
+  void WriteDict(uint8_t* buffer);
+
+  MemPool* mem_pool() { return pool_; }
+
+  /// The number of entries in the dictionary.
+  int num_entries() const { return uniques_.size(); }
+
+ private:
+  MemoryAllocator* allocator_;
+
+  // For ByteArray / FixedLenByteArray data. Not owned
+  MemPool* pool_;
+
   /// Size of the table. Must be a power of 2.
   int hash_table_size_;
 
@@ -218,40 +255,13 @@ class DictEncoderBase {
   //
   // These values correspond to the uniques_ array
   Vector<hash_slot_t> hash_slots_;
-  MemoryAllocator* allocator_;
-
-  // For ByteArray / FixedLenByteArray data. Not owned
-  MemPool* pool_;
 
   /// Indices that have not yet be written out by WriteIndices().
   std::vector<int> buffered_indices_;
 
   /// The number of bytes needed to encode the dictionary.
   int dict_encoded_size_;
-};
 
-template <typename T>
-class DictEncoder : public DictEncoderBase {
- public:
-  explicit DictEncoder(MemPool* pool = nullptr,
-      MemoryAllocator* allocator = default_allocator(), int type_length = -1)
-      : DictEncoderBase(pool, allocator), type_length_(type_length) {}
-
-  // TODO(wesm): think about how to address the construction semantics in
-  // encodings/dictionary-encoding.h
-  void set_mem_pool(MemPool* pool) { pool_ = pool; }
-
-  void set_type_length(int type_length) { type_length_ = type_length; }
-
-  /// Encode value. Note that this does not actually write any data, just
-  /// buffers the value's index to be written later.
-  void Put(const T& value);
-
-  virtual void WriteDict(uint8_t* buffer);
-
-  virtual int num_entries() const { return uniques_.size(); }
-
- private:
   // The unique observed values
   std::vector<T> uniques_;
 
@@ -268,34 +278,35 @@ class DictEncoder : public DictEncoderBase {
   void AddDictKey(const T& value);
 };
 
-template <typename T>
-inline int DictEncoder<T>::Hash(const T& value) const {
+template <typename DType>
+inline int DictEncoder<DType>::Hash(const typename DType::c_type& value) const {
   return HashUtil::Hash(&value, sizeof(value), 0);
 }
 
 template <>
-inline int DictEncoder<ByteArray>::Hash(const ByteArray& value) const {
+inline int DictEncoder<ByteArrayType>::Hash(const ByteArray& value) const {
   return HashUtil::Hash(value.ptr, value.len, 0);
 }
 
 template <>
-inline int DictEncoder<FixedLenByteArray>::Hash(const FixedLenByteArray& value) const {
+inline int DictEncoder<FLBAType>::Hash(const FixedLenByteArray& value) const {
   return HashUtil::Hash(value.ptr, type_length_, 0);
 }
 
-template <typename T>
-inline bool DictEncoder<T>::SlotDifferent(const T& v, hash_slot_t slot) {
+template <typename DType>
+inline bool DictEncoder<DType>::SlotDifferent(
+    const typename DType::c_type& v, hash_slot_t slot) {
   return v != uniques_[slot];
 }
 
 template <>
-inline bool DictEncoder<FixedLenByteArray>::SlotDifferent(
+inline bool DictEncoder<FLBAType>::SlotDifferent(
     const FixedLenByteArray& v, hash_slot_t slot) {
   return 0 != memcmp(v.ptr, uniques_[slot].ptr, type_length_);
 }
 
-template <typename T>
-inline void DictEncoder<T>::Put(const T& v) {
+template <typename DType>
+inline void DictEncoder<DType>::Put(const typename DType::c_type& v) {
   int j = Hash(v) & mod_bitmask_;
   hash_slot_t index = hash_slots_[j];
 
@@ -321,8 +332,8 @@ inline void DictEncoder<T>::Put(const T& v) {
   buffered_indices_.push_back(index);
 }
 
-template <typename T>
-inline void DictEncoder<T>::DoubleTableSize() {
+template <typename DType>
+inline void DictEncoder<DType>::DoubleTableSize() {
   int new_size = hash_table_size_ * 2;
   Vector<hash_slot_t> new_hash_slots(0, allocator_);
   new_hash_slots.Assign(new_size, HASH_SLOT_EMPTY);
@@ -335,7 +346,7 @@ inline void DictEncoder<T>::DoubleTableSize() {
 
     // Compute the hash value mod the new table size to start looking for an
     // empty slot
-    const T& v = uniques_[index];
+    const typename DType::c_type& v = uniques_[index];
 
     // Find an empty slot in the new hash table
     j = Hash(v) & (new_size - 1);
@@ -356,14 +367,14 @@ inline void DictEncoder<T>::DoubleTableSize() {
   hash_slots_.Swap(new_hash_slots);
 }
 
-template <typename T>
-inline void DictEncoder<T>::AddDictKey(const T& v) {
+template <typename DType>
+inline void DictEncoder<DType>::AddDictKey(const typename DType::c_type& v) {
   uniques_.push_back(v);
-  dict_encoded_size_ += sizeof(T);
+  dict_encoded_size_ += sizeof(typename DType::c_type);
 }
 
 template <>
-inline void DictEncoder<ByteArray>::AddDictKey(const ByteArray& v) {
+inline void DictEncoder<ByteArrayType>::AddDictKey(const ByteArray& v) {
   uint8_t* heap = pool_->Allocate(v.len);
   if (UNLIKELY(v.len > 0 && heap == nullptr)) { throw ParquetException("out of memory"); }
   memcpy(heap, v.ptr, v.len);
@@ -372,7 +383,7 @@ inline void DictEncoder<ByteArray>::AddDictKey(const ByteArray& v) {
 }
 
 template <>
-inline void DictEncoder<FixedLenByteArray>::AddDictKey(const FixedLenByteArray& v) {
+inline void DictEncoder<FLBAType>::AddDictKey(const FixedLenByteArray& v) {
   uint8_t* heap = pool_->Allocate(type_length_);
   if (UNLIKELY(type_length_ > 0 && heap == nullptr)) {
     throw ParquetException("out of memory");
@@ -383,15 +394,24 @@ inline void DictEncoder<FixedLenByteArray>::AddDictKey(const FixedLenByteArray&
   dict_encoded_size_ += type_length_;
 }
 
-template <typename T>
-inline void DictEncoder<T>::WriteDict(uint8_t* buffer) {
+template <typename DType>
+inline void DictEncoder<DType>::WriteDict(uint8_t* buffer) {
+  // For primitive types, only a memcpy
+  memcpy(buffer, uniques_.data(), sizeof(typename DType::c_type) * uniques_.size());
+}
+
+template <>
+inline void DictEncoder<BooleanType>::WriteDict(uint8_t* buffer) {
   // For primitive types, only a memcpy
-  memcpy(buffer, &uniques_[0], sizeof(T) * uniques_.size());
+  // memcpy(buffer, uniques_.data(), sizeof(typename DType::c_type) * uniques_.size());
+  for (size_t i = 0; i < uniques_.size(); i++) {
+    buffer[i] = uniques_[i];
+  }
 }
 
 // ByteArray and FLBA already have the dictionary encoded in their data heaps
 template <>
-inline void DictEncoder<ByteArray>::WriteDict(uint8_t* buffer) {
+inline void DictEncoder<ByteArrayType>::WriteDict(uint8_t* buffer) {
   for (const ByteArray& v : uniques_) {
     memcpy(buffer, reinterpret_cast<const void*>(&v.len), sizeof(uint32_t));
     buffer += sizeof(uint32_t);
@@ -401,14 +421,15 @@ inline void DictEncoder<ByteArray>::WriteDict(uint8_t* buffer) {
 }
 
 template <>
-inline void DictEncoder<FixedLenByteArray>::WriteDict(uint8_t* buffer) {
+inline void DictEncoder<FLBAType>::WriteDict(uint8_t* buffer) {
   for (const FixedLenByteArray& v : uniques_) {
     memcpy(buffer, v.ptr, type_length_);
     buffer += type_length_;
   }
 }
 
-inline int DictEncoderBase::WriteIndices(uint8_t* buffer, int buffer_len) {
+template <typename DType>
+inline int DictEncoder<DType>::WriteIndices(uint8_t* buffer, int buffer_len) {
   // Write bit width in first byte
   *buffer = bit_width();
   ++buffer;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index 8555930..a325ab5 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -19,12 +19,14 @@
 #define PARQUET_ENCODINGS_ENCODER_H
 
 #include <cstdint>
+#include <memory>
 
 #include "parquet/exception.h"
 #include "parquet/types.h"
 
 namespace parquet {
 
+class Buffer;
 class ColumnDescriptor;
 class OutputStream;
 
@@ -39,7 +41,9 @@ class Encoder {
 
   virtual ~Encoder() {}
 
-  virtual void Encode(const T* src, int num_values, OutputStream* dst) = 0;
+  virtual int64_t EstimatedDataEncodedSize() = 0;
+  virtual std::shared_ptr<Buffer> FlushValues() = 0;
+  virtual void Put(const T* src, int num_values) = 0;
 
   const Encoding::type encoding() const { return encoding_; }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/encodings/encoding-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-benchmark.cc b/src/parquet/encodings/encoding-benchmark.cc
index 43348d8..e62d758 100644
--- a/src/parquet/encodings/encoding-benchmark.cc
+++ b/src/parquet/encodings/encoding-benchmark.cc
@@ -39,8 +39,8 @@ static void BM_PlainEncodingBoolean(::benchmark::State& state) {
   PlainEncoder<BooleanType> encoder(nullptr);
 
   while (state.KeepRunning()) {
-    InMemoryOutputStream dst;
-    encoder.Encode(values, values.size(), &dst);
+    encoder.Put(values, values.size());
+    encoder.FlushValues();
   }
   state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(bool));
 }
@@ -51,9 +51,8 @@ static void BM_PlainDecodingBoolean(::benchmark::State& state) {
   std::vector<bool> values(state.range_x(), 64);
   bool* output = new bool[state.range_x()];
   PlainEncoder<BooleanType> encoder(nullptr);
-  InMemoryOutputStream dst;
-  encoder.Encode(values, values.size(), &dst);
-  std::shared_ptr<Buffer> buf = dst.GetBuffer();
+  encoder.Put(values, values.size());
+  std::shared_ptr<Buffer> buf = encoder.FlushValues();
 
   while (state.KeepRunning()) {
     PlainDecoder<BooleanType> decoder(nullptr);
@@ -72,8 +71,8 @@ static void BM_PlainEncodingInt64(::benchmark::State& state) {
   PlainEncoder<Int64Type> encoder(nullptr);
 
   while (state.KeepRunning()) {
-    InMemoryOutputStream dst;
-    encoder.Encode(values.data(), values.size(), &dst);
+    encoder.Put(values.data(), values.size());
+    encoder.FlushValues();
   }
   state.SetBytesProcessed(state.iterations() * state.range_x() * sizeof(int64_t));
 }
@@ -83,9 +82,8 @@ BENCHMARK(BM_PlainEncodingInt64)->Range(1024, 65536);
 static void BM_PlainDecodingInt64(::benchmark::State& state) {
   std::vector<int64_t> values(state.range_x(), 64);
   PlainEncoder<Int64Type> encoder(nullptr);
-  InMemoryOutputStream dst;
-  encoder.Encode(values.data(), values.size(), &dst);
-  std::shared_ptr<Buffer> buf = dst.GetBuffer();
+  encoder.Put(values.data(), values.size());
+  std::shared_ptr<Buffer> buf = encoder.FlushValues();
 
   while (state.KeepRunning()) {
     PlainDecoder<Int64Type> decoder(nullptr);
@@ -110,7 +108,7 @@ static void DecodeDict(
       std::make_shared<OwnedMutableBuffer>();
   auto indices = std::make_shared<OwnedMutableBuffer>();
 
-  DictEncoder<T> encoder(&pool, allocator, descr->type_length());
+  DictEncoder<Type> encoder(descr.get(), &pool, allocator);
   for (int i = 0; i < num_values; ++i) {
     encoder.Put(values[i]);
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
index bde6f10..50adc9e 100644
--- a/src/parquet/encodings/encoding-test.cc
+++ b/src/parquet/encodings/encoding-test.cc
@@ -50,10 +50,9 @@ TEST(VectorBooleanTest, TestEncodeDecode) {
   PlainEncoder<BooleanType> encoder(nullptr);
   PlainDecoder<BooleanType> decoder(nullptr);
 
-  InMemoryOutputStream dst;
-  encoder.Encode(draws, nvalues, &dst);
+  encoder.Put(draws, nvalues);
 
-  std::shared_ptr<Buffer> encode_buffer = dst.GetBuffer();
+  std::shared_ptr<Buffer> encode_buffer = encoder.FlushValues();
   ASSERT_EQ(nbytes, encode_buffer->size());
 
   vector<uint8_t> decode_buffer(nbytes);
@@ -125,13 +124,14 @@ void VerifyResults<FLBA>(FLBA* result, FLBA* expected, int num_values) {
 // ----------------------------------------------------------------------
 // Create some column descriptors
 
-template <typename T>
+template <typename DType>
 std::shared_ptr<ColumnDescriptor> ExampleDescr() {
-  return nullptr;
+  auto node = schema::PrimitiveNode::Make("name", Repetition::OPTIONAL, DType::type_num);
+  return std::make_shared<ColumnDescriptor>(node, 0, 0);
 }
 
 template <>
-std::shared_ptr<ColumnDescriptor> ExampleDescr<FLBA>() {
+std::shared_ptr<ColumnDescriptor> ExampleDescr<FLBAType>() {
   auto node = schema::PrimitiveNode::Make("name", Repetition::OPTIONAL,
       Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, flba_length, 10, 2);
   return std::make_shared<ColumnDescriptor>(node, 0, 0);
@@ -147,8 +147,8 @@ class TestEncodingBase : public ::testing::Test {
   static constexpr int TYPE = Type::type_num;
 
   void SetUp() {
-    descr_ = ExampleDescr<T>();
-    if (descr_) { type_length_ = descr_->type_length(); }
+    descr_ = ExampleDescr<Type>();
+    type_length_ = descr_->type_length();
     allocator_ = default_allocator();
   }
 
@@ -215,10 +215,8 @@ class TestPlainEncoding : public TestEncodingBase<Type> {
   virtual void CheckRoundtrip() {
     PlainEncoder<Type> encoder(descr_.get());
     PlainDecoder<Type> decoder(descr_.get());
-    InMemoryOutputStream dst;
-    encoder.Encode(draws_, num_values_, &dst);
-
-    encode_buffer_ = dst.GetBuffer();
+    encoder.Put(draws_, num_values_);
+    encode_buffer_ = encoder.FlushValues();
 
     decoder.SetData(num_values_, encode_buffer_->data(), encode_buffer_->size());
     int values_decoded = decoder.Decode(decode_buf_, num_values_);
@@ -249,22 +247,15 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
   static constexpr int TYPE = Type::type_num;
 
   void CheckRoundtrip() {
-    DictEncoder<T> encoder(&pool_, allocator_, type_length_);
+    DictEncoder<Type> encoder(descr_.get(), &pool_);
 
     dict_buffer_ = std::make_shared<OwnedMutableBuffer>();
-    auto indices = std::make_shared<OwnedMutableBuffer>();
 
-    ASSERT_NO_THROW({
-      for (int i = 0; i < num_values_; ++i) {
-        encoder.Put(draws_[i]);
-      }
-    });
+    ASSERT_NO_THROW(encoder.Put(draws_, num_values_));
     dict_buffer_->Resize(encoder.dict_encoded_size());
     encoder.WriteDict(dict_buffer_->mutable_data());
 
-    indices->Resize(encoder.EstimatedDataEncodedSize());
-    int actual_bytes = encoder.WriteIndices(indices->mutable_data(), indices->size());
-    indices->Resize(actual_bytes);
+    std::shared_ptr<Buffer> indices = encoder.FlushValues();
 
     PlainDecoder<Type> dict_decoder(descr_.get());
     dict_decoder.SetData(

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index c169688..eee3f65 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -163,9 +163,16 @@ class PlainEncoder : public Encoder<DType> {
 
   explicit PlainEncoder(
       const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
-      : Encoder<DType>(descr, Encoding::PLAIN, allocator) {}
+      : Encoder<DType>(descr, Encoding::PLAIN, allocator),
+        values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {}
 
-  void Encode(const T* src, int num_values, OutputStream* dst) override;
+  int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); }
+
+  std::shared_ptr<Buffer> FlushValues() override;
+  void Put(const T* src, int num_values) override;
+
+ protected:
+  std::shared_ptr<InMemoryOutputStream> values_sink_;
 };
 
 template <>
@@ -173,9 +180,27 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
  public:
   explicit PlainEncoder(
       const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
-      : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator) {}
+      : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator),
+        values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {}
+
+  int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); }
+
+  std::shared_ptr<Buffer> FlushValues() override {
+    std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
+    values_sink_.reset(
+        new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_));
+    return buffer;
+  }
+
+  void Put(const bool* src, int num_values) override {
+    Encode(src, num_values, values_sink_.get());
+  }
 
-  virtual void Encode(const bool* src, int num_values, OutputStream* dst) {
+  void Put(const std::vector<bool>& src, int num_values) {
+    Encode(src, num_values, values_sink_.get());
+  }
+
+  void Encode(const bool* src, int num_values, OutputStream* dst) {
     int bytes_required = BitUtil::Ceil(num_values, 8);
     OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);
 
@@ -207,30 +232,39 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
     // Write the result to the output stream
     dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
   }
+
+ protected:
+  std::shared_ptr<InMemoryOutputStream> values_sink_;
 };
 
 template <typename DType>
-inline void PlainEncoder<DType>::Encode(
-    const T* buffer, int num_values, OutputStream* dst) {
-  dst->Write(reinterpret_cast<const uint8_t*>(buffer), num_values * sizeof(T));
+inline std::shared_ptr<Buffer> PlainEncoder<DType>::FlushValues() {
+  std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
+  values_sink_.reset(
+      new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_));
+  return buffer;
+}
+
+template <typename DType>
+inline void PlainEncoder<DType>::Put(const T* buffer, int num_values) {
+  values_sink_->Write(reinterpret_cast<const uint8_t*>(buffer), num_values * sizeof(T));
 }
 
 template <>
-inline void PlainEncoder<ByteArrayType>::Encode(
-    const ByteArray* src, int num_values, OutputStream* dst) {
+inline void PlainEncoder<ByteArrayType>::Put(const ByteArray* src, int num_values) {
   for (int i = 0; i < num_values; ++i) {
     // Write the result to the output stream
-    dst->Write(reinterpret_cast<const uint8_t*>(&src[i].len), sizeof(uint32_t));
-    dst->Write(reinterpret_cast<const uint8_t*>(src[i].ptr), src[i].len);
+    values_sink_->Write(reinterpret_cast<const uint8_t*>(&src[i].len), sizeof(uint32_t));
+    values_sink_->Write(reinterpret_cast<const uint8_t*>(src[i].ptr), src[i].len);
   }
 }
 
 template <>
-inline void PlainEncoder<FLBAType>::Encode(
-    const FixedLenByteArray* src, int num_values, OutputStream* dst) {
+inline void PlainEncoder<FLBAType>::Put(const FixedLenByteArray* src, int num_values) {
   for (int i = 0; i < num_values; ++i) {
     // Write the result to the output stream
-    dst->Write(reinterpret_cast<const uint8_t*>(src[i].ptr), descr_->type_length());
+    values_sink_->Write(
+        reinterpret_cast<const uint8_t*>(src[i].ptr), descr_->type_length());
   }
 }
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 0716325..f07b44b 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -35,7 +35,10 @@ static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
 
 SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type codec,
     format::ColumnChunk* metadata, MemoryAllocator* allocator)
-    : sink_(sink), metadata_(metadata), allocator_(allocator) {
+    : sink_(sink),
+      metadata_(metadata),
+      // allocator_(allocator),
+      compression_buffer_(std::make_shared<OwnedMutableBuffer>(0, allocator)) {
   compressor_ = Codec::Create(codec);
   // Currently we directly start with the data page
   metadata_->meta_data.__set_data_page_offset(sink_->Tell());
@@ -52,64 +55,76 @@ void SerializedPageWriter::AddEncoding(Encoding::type encoding) {
   }
 }
 
-int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values,
-    const std::shared_ptr<Buffer>& definition_levels,
-    Encoding::type definition_level_encoding,
-    const std::shared_ptr<Buffer>& repetition_levels,
-    Encoding::type repetition_level_encoding, const std::shared_ptr<Buffer>& values,
-    Encoding::type encoding) {
-  int64_t uncompressed_size =
-      definition_levels->size() + repetition_levels->size() + values->size();
-
-  // Concatenate data into a single buffer
-  // TODO: In the uncompressed case, directly write this to the sink
-  // TODO: Reuse the (un)compressed_data buffer instead of recreating it each time.
-  std::shared_ptr<OwnedMutableBuffer> uncompressed_data =
-      std::make_shared<OwnedMutableBuffer>(uncompressed_size, allocator_);
-  uint8_t* uncompressed_ptr = uncompressed_data->mutable_data();
-  memcpy(uncompressed_ptr, repetition_levels->data(), repetition_levels->size());
-  uncompressed_ptr += repetition_levels->size();
-  memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size());
-  uncompressed_ptr += definition_levels->size();
-  memcpy(uncompressed_ptr, values->data(), values->size());
+std::shared_ptr<Buffer> SerializedPageWriter::Compress(
+    const std::shared_ptr<Buffer>& buffer) {
+  // Fast path, no compressor available.
+  if (!compressor_) return buffer;
 
   // Compress the data
-  int64_t compressed_size = uncompressed_size;
-  std::shared_ptr<OwnedMutableBuffer> compressed_data = uncompressed_data;
-  if (compressor_) {
-    const uint8_t* uncompressed_ptr = uncompressed_data->data();
-    int64_t max_compressed_size =
-        compressor_->MaxCompressedLen(uncompressed_size, uncompressed_ptr);
-    compressed_data =
-        std::make_shared<OwnedMutableBuffer>(max_compressed_size, allocator_);
-    compressed_size = compressor_->Compress(uncompressed_size, uncompressed_ptr,
-        max_compressed_size, compressed_data->mutable_data());
-  }
-  // Uncompressed data is not needed anymore, so immediately get rid of it.
-  uncompressed_data.reset();
+  int64_t max_compressed_size =
+      compressor_->MaxCompressedLen(buffer->size(), buffer->data());
+  compression_buffer_->Resize(max_compressed_size);
+  int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(),
+      max_compressed_size, compression_buffer_->mutable_data());
+  compression_buffer_->Resize(compressed_size);
+  return compression_buffer_;
+}
+
+int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) {
+  int64_t uncompressed_size = page.size();
+  std::shared_ptr<Buffer> compressed_data = Compress(page.buffer());
 
   format::DataPageHeader data_page_header;
-  data_page_header.__set_num_values(num_rows);
-  data_page_header.__set_encoding(ToThrift(encoding));
-  data_page_header.__set_definition_level_encoding(ToThrift(definition_level_encoding));
-  data_page_header.__set_repetition_level_encoding(ToThrift(repetition_level_encoding));
+  data_page_header.__set_num_values(page.num_values());
+  data_page_header.__set_encoding(ToThrift(page.encoding()));
+  data_page_header.__set_definition_level_encoding(
+      ToThrift(page.definition_level_encoding()));
+  data_page_header.__set_repetition_level_encoding(
+      ToThrift(page.repetition_level_encoding()));
   // TODO(PARQUET-593) statistics
 
   format::PageHeader page_header;
   page_header.__set_type(format::PageType::DATA_PAGE);
   page_header.__set_uncompressed_page_size(uncompressed_size);
-  page_header.__set_compressed_page_size(compressed_size);
+  page_header.__set_compressed_page_size(compressed_data->size());
   page_header.__set_data_page_header(data_page_header);
   // TODO(PARQUET-594) crc checksum
 
   int64_t start_pos = sink_->Tell();
   SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
   int64_t header_size = sink_->Tell() - start_pos;
-  sink_->Write(compressed_data->data(), compressed_size);
+  sink_->Write(compressed_data->data(), compressed_data->size());
+
+  metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size;
+  metadata_->meta_data.total_compressed_size += compressed_data->size() + header_size;
+  metadata_->meta_data.num_values += page.num_values();
+
+  return sink_->Tell() - start_pos;
+}
+
+int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {
+  int64_t uncompressed_size = page.size();
+  std::shared_ptr<Buffer> compressed_data = Compress(page.buffer());
+
+  format::DictionaryPageHeader dict_page_header;
+  dict_page_header.__set_num_values(page.num_values());
+  dict_page_header.__set_encoding(ToThrift(page.encoding()));
+  dict_page_header.__set_is_sorted(page.is_sorted());
+
+  format::PageHeader page_header;
+  page_header.__set_type(format::PageType::DICTIONARY_PAGE);
+  page_header.__set_uncompressed_page_size(uncompressed_size);
+  page_header.__set_compressed_page_size(compressed_data->size());
+  page_header.__set_dictionary_page_header(dict_page_header);
+  // TODO(PARQUET-594) crc checksum
+
+  int64_t start_pos = sink_->Tell();
+  SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
+  int64_t header_size = sink_->Tell() - start_pos;
+  sink_->Write(compressed_data->data(), compressed_data->size());
 
   metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size;
-  metadata_->meta_data.total_compressed_size += compressed_size + header_size;
-  metadata_->meta_data.num_values += num_values;
+  metadata_->meta_data.total_compressed_size += compressed_data->size() + header_size;
 
   return sink_->Tell() - start_pos;
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 29b326f..7c46408 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -32,7 +32,6 @@ namespace parquet {
 // by a serialized Thrift format::PageHeader indicating the type of each page
 // and the page metadata.
 //
-// TODO: Currently only writes DataPage pages.
 class SerializedPageWriter : public PageWriter {
  public:
   SerializedPageWriter(OutputStream* sink, Compression::type codec,
@@ -40,27 +39,29 @@ class SerializedPageWriter : public PageWriter {
 
   virtual ~SerializedPageWriter() {}
 
-  // TODO Refactor that this just takes a DataPage instance.
-  // For this we need to be clear how to handle num_rows and num_values
-  int64_t WriteDataPage(int32_t num_rows, int32_t num_values,
-      const std::shared_ptr<Buffer>& definition_levels,
-      Encoding::type definition_level_encoding,
-      const std::shared_ptr<Buffer>& repetition_levels,
-      Encoding::type repetition_level_encoding, const std::shared_ptr<Buffer>& values,
-      Encoding::type encoding) override;
+  int64_t WriteDataPage(const DataPage& page) override;
+
+  int64_t WriteDictionaryPage(const DictionaryPage& page) override;
 
   void Close() override;
 
  private:
   OutputStream* sink_;
   format::ColumnChunk* metadata_;
-  MemoryAllocator* allocator_;
+  // MemoryAllocator* allocator_;
 
   // Compression codec to use.
   std::unique_ptr<Codec> compressor_;
-  OwnedMutableBuffer compression_buffer_;
+  std::shared_ptr<OwnedMutableBuffer> compression_buffer_;
 
   void AddEncoding(Encoding::type encoding);
+  /**
+   * Compress a buffer.
+   *
+   * This method may return compression_buffer_ and thus the resulting memory
+   * is only valid until the next call to Compress().
+   */
+  std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer);
 };
 
 // RowGroupWriter::Contents implementation for the Parquet file specification

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/util/mem-pool.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.cc b/src/parquet/util/mem-pool.cc
index b211bee..1ab40bc 100644
--- a/src/parquet/util/mem-pool.cc
+++ b/src/parquet/util/mem-pool.cc
@@ -28,6 +28,7 @@
 #include <string>
 
 #include "parquet/util/bit-util.h"
+#include "parquet/util/logging.h"
 
 namespace parquet {
 
@@ -55,6 +56,40 @@ MemPool::~MemPool() {
   DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
 }
 
+void MemPool::ReturnPartialAllocation(int byte_size) {
+  DCHECK_GE(byte_size, 0);
+  DCHECK(current_chunk_idx_ != -1);
+  ChunkInfo& info = chunks_[current_chunk_idx_];
+  DCHECK_GE(info.allocated_bytes, byte_size);
+  info.allocated_bytes -= byte_size;
+  total_allocated_bytes_ -= byte_size;
+}
+
+template <bool CHECK_LIMIT_FIRST>
+uint8_t* MemPool::Allocate(int size) {
+  if (size == 0) return NULL;
+
+  int64_t num_bytes = BitUtil::RoundUp(size, 8);
+  if (current_chunk_idx_ == -1 ||
+      num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
+          chunks_[current_chunk_idx_].size) {
+    // If we couldn't allocate a new chunk, return NULL.
+    if (UNLIKELY(!FindChunk(num_bytes))) return NULL;
+  }
+  ChunkInfo& info = chunks_[current_chunk_idx_];
+  uint8_t* result = info.data + info.allocated_bytes;
+  DCHECK_LE(info.allocated_bytes + num_bytes, info.size);
+  info.allocated_bytes += num_bytes;
+  total_allocated_bytes_ += num_bytes;
+  DCHECK_LE(current_chunk_idx_, static_cast<int>(chunks_.size()) - 1);
+  peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
+  return result;
+}
+
+uint8_t* MemPool::Allocate(int size) {
+  return Allocate<false>(size);
+}
+
 void MemPool::Clear() {
   current_chunk_idx_ = -1;
   for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/0a0323d8/src/parquet/util/mem-pool.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.h b/src/parquet/util/mem-pool.h
index b64ee29..496665d 100644
--- a/src/parquet/util/mem-pool.h
+++ b/src/parquet/util/mem-pool.h
@@ -27,8 +27,6 @@
 #include <vector>
 #include <string>
 
-#include "parquet/util/logging.h"
-#include "parquet/util/bit-util.h"
 #include "parquet/util/mem-allocator.h"
 
 namespace parquet {
@@ -85,19 +83,12 @@ class MemPool {
   /// Allocates 8-byte aligned section of memory of 'size' bytes at the end
   /// of the the current chunk. Creates a new chunk if there aren't any chunks
   /// with enough capacity.
-  uint8_t* Allocate(int size) { return Allocate<false>(size); }
+  uint8_t* Allocate(int size);
 
   /// Returns 'byte_size' to the current chunk back to the mem pool. This can
   /// only be used to return either all or part of the previous allocation returned
   /// by Allocate().
-  void ReturnPartialAllocation(int byte_size) {
-    DCHECK_GE(byte_size, 0);
-    DCHECK(current_chunk_idx_ != -1);
-    ChunkInfo& info = chunks_[current_chunk_idx_];
-    DCHECK_GE(info.allocated_bytes, byte_size);
-    info.allocated_bytes -= byte_size;
-    total_allocated_bytes_ -= byte_size;
-  }
+  void ReturnPartialAllocation(int byte_size);
 
   /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
   void Clear();
@@ -180,25 +171,7 @@ class MemPool {
   }
 
   template <bool CHECK_LIMIT_FIRST>
-  uint8_t* Allocate(int size) {
-    if (size == 0) return NULL;
-
-    int64_t num_bytes = BitUtil::RoundUp(size, 8);
-    if (current_chunk_idx_ == -1 ||
-        num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
-            chunks_[current_chunk_idx_].size) {
-      // If we couldn't allocate a new chunk, return NULL.
-      if (UNLIKELY(!FindChunk(num_bytes))) return NULL;
-    }
-    ChunkInfo& info = chunks_[current_chunk_idx_];
-    uint8_t* result = info.data + info.allocated_bytes;
-    DCHECK_LE(info.allocated_bytes + num_bytes, info.size);
-    info.allocated_bytes += num_bytes;
-    total_allocated_bytes_ += num_bytes;
-    DCHECK_LE(current_chunk_idx_, static_cast<int>(chunks_.size()) - 1);
-    peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
-    return result;
-  }
+  uint8_t* Allocate(int size);
 };
 
 }  // namespace parquet


Mime
View raw message