parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [parquet-cpp] branch master updated: PARQUET-1372: Add an API to allow writing RowGroups based on size
Date Sat, 25 Aug 2018 11:33:41 GMT
This is an automated email from the ASF dual-hosted git repository.

uwe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 80e110c  PARQUET-1372:  Add an API to allow writing RowGroups based on size
80e110c is described below

commit 80e110c823c5631ce4a4f0a5da486e759219f1e3
Author: Deepak Majeti <deepak.majeti@hpe.com>
AuthorDate: Sat Aug 25 13:33:35 2018 +0200

    PARQUET-1372:  Add an API to allow writing RowGroups based on size
    
    I split the changes into multiple commits to ease the review.
    Used the example program to test the new API.
    I will add unit tests once we converge on the API after review.
    Thanks to @anatolishein for collaborating with the API design.
    
    Author: Deepak Majeti <deepak.majeti@hpe.com>
    
    Closes #484 from majetideepak/PARQUET-1372 and squashes the following commits:
    
    143ed51 [Deepak Majeti] improve comments
    c10fe08 [Deepak Majeti] Add test
    d12b10b [Deepak Majeti] Review comments
    cb99b3f [Deepak Majeti] fix compiler warnings
    e179a4c [Deepak Majeti] add example header
    710bbe0 [Deepak Majeti] clang format
    9e03004 [Deepak Majeti] reorg examples
    410a3af [Deepak Majeti] remove flush_on_close
    e148817 [Deepak Majeti] add BufferedPageWriter
    26a52c1 [Deepak Majeti] clang format
    20049c0 [Deepak Majeti] modify examples
    9db26a2 [Deepak Majeti] Combine RowGroupWriter2 with RowGroupWriter
    cb7d69c [Deepak Majeti] fix compiler errors
    21642b3 [Deepak Majeti] clang format
    530b835 [Deepak Majeti] example for RowGroupWriter2
    0fc1f5c [Deepak Majeti] Extend Column Writer to flush pages on Close
    f2f420d [Deepak Majeti] RowGroupWriter2, implementation that writes all columns at once
---
 CMakeLists.txt                                     |   4 +-
 examples/low-level-api/CMakeLists.txt              |   4 +
 examples/low-level-api/reader-writer.cc            |  60 +---
 .../{reader-writer.cc => reader-writer2.cc}        | 313 ++++++++++-----------
 examples/low-level-api/reader_writer.h             |  71 +++++
 src/parquet/arrow/test-util.h                      |   7 +-
 src/parquet/column_writer-test.cc                  |   1 -
 src/parquet/column_writer.cc                       |  74 ++++-
 src/parquet/column_writer.h                        |  17 +-
 src/parquet/file-serialize-test.cc                 |  68 ++++-
 src/parquet/file_writer.cc                         | 153 ++++++++--
 src/parquet/file_writer.h                          |  33 ++-
 12 files changed, 531 insertions(+), 274 deletions(-)

diff --git a/CMakeLists.txt b/CMakeLists.txt
index 5b3c460..698f6d7 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -458,7 +458,7 @@ add_custom_target(format-example
   ${BUILD_SUPPORT_DIR}/run_clang_format.py
   ${CLANG_FORMAT_BIN}
   ${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt
-  ${CMAKE_CURRENT_SOURCE_DIR}/examples/parquet-arrow)
+  ${CMAKE_CURRENT_SOURCE_DIR}/examples)
 
 add_custom_target(format
   DEPENDS format-example
@@ -474,7 +474,7 @@ add_custom_target(format
 add_custom_target(check-format-examples ${BUILD_SUPPORT_DIR}/run_clang_format.py
    ${CLANG_FORMAT_BIN}
    ${BUILD_SUPPORT_DIR}/clang_format_exclusions.txt
-   ${CMAKE_CURRENT_SOURCE_DIR}/examples/parquet-arrow 1)
+   ${CMAKE_CURRENT_SOURCE_DIR}/examples 1)
 add_custom_target(check-format
    DEPENDS check-format-examples
    COMMAND
diff --git a/examples/low-level-api/CMakeLists.txt b/examples/low-level-api/CMakeLists.txt
index 721fa9a..64ba110 100644
--- a/examples/low-level-api/CMakeLists.txt
+++ b/examples/low-level-api/CMakeLists.txt
@@ -17,5 +17,9 @@
 
 if (PARQUET_BUILD_EXECUTABLES)
   add_executable(reader-writer reader-writer.cc)
+  add_executable(reader-writer2 reader-writer2.cc)
+  target_include_directories(reader-writer PRIVATE .)
+  target_include_directories(reader-writer2 PRIVATE .)
   target_link_libraries(reader-writer parquet_static)
+  target_link_libraries(reader-writer2 parquet_static)
 endif()
diff --git a/examples/low-level-api/reader-writer.cc b/examples/low-level-api/reader-writer.cc
index fb2ec77..09cd137 100644
--- a/examples/low-level-api/reader-writer.cc
+++ b/examples/low-level-api/reader-writer.cc
@@ -18,19 +18,16 @@
 #include <cassert>
 #include <fstream>
 #include <iostream>
-#include <list>
 #include <memory>
 
-#include <arrow/io/file.h>
-#include <arrow/util/logging.h>
-
-#include <parquet/api/reader.h>
-#include <parquet/api/writer.h>
+#include <reader_writer.h>
 
 /*
  * This example describes writing and reading Parquet Files in C++ and serves as a
  * reference to the API.
  * The file contains all the physical data types supported by Parquet.
+ * This example uses the RowGroupWriter API that supports writing RowGroups optimized for
+ *memory consumption
  **/
 
 /* Parquet is a structured columnar file format
@@ -46,56 +43,8 @@
  **/
 
 constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
-constexpr int FIXED_LENGTH = 10;
 const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet";
 
-using parquet::Repetition;
-using parquet::Type;
-using parquet::LogicalType;
-using parquet::schema::PrimitiveNode;
-using parquet::schema::GroupNode;
-
-static std::shared_ptr<GroupNode> SetupSchema() {
-  parquet::schema::NodeVector fields;
-  // Create a primitive node named 'boolean_field' with type:BOOLEAN,
-  // repetition:REQUIRED
-  fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED,
-                                       Type::BOOLEAN, LogicalType::NONE));
-
-  // Create a primitive node named 'int32_field' with type:INT32, repetition:REQUIRED,
-  // logical type:TIME_MILLIS
-  fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED, Type::INT32,
-                                       LogicalType::TIME_MILLIS));
-
-  // Create a primitive node named 'int64_field' with type:INT64, repetition:REPEATED
-  fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED, Type::INT64,
-                                       LogicalType::NONE));
-
-  fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED, Type::INT96,
-                                       LogicalType::NONE));
-
-  fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED, Type::FLOAT,
-                                       LogicalType::NONE));
-
-  fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE,
-                                       LogicalType::NONE));
-
-  // Create a primitive node named 'ba_field' with type:BYTE_ARRAY, repetition:OPTIONAL
-  fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL, Type::BYTE_ARRAY,
-                                       LogicalType::NONE));
-
-  // Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY,
-  // repetition:REQUIRED, field_length = FIXED_LENGTH
-  fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED,
-                                       Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE,
-                                       FIXED_LENGTH));
-
-  // Create a GroupNode named 'schema' using the primitive nodes defined above
-  // This GroupNode is the root node of the schema tree
-  return std::static_pointer_cast<GroupNode>(
-      GroupNode::Make("schema", Repetition::REQUIRED, fields));
-}
-
 int main(int argc, char** argv) {
   /**********************************************************************************
                              PARQUET WRITER EXAMPLE
@@ -122,8 +71,7 @@ int main(int argc, char** argv) {
         parquet::ParquetFileWriter::Open(out_file, schema, props);
 
     // Append a RowGroup with a specific number of rows.
-    parquet::RowGroupWriter* rg_writer =
-        file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);
+    parquet::RowGroupWriter* rg_writer = file_writer->AppendRowGroup();
 
     // Write the Bool column
     parquet::BoolWriter* bool_writer =
diff --git a/examples/low-level-api/reader-writer.cc b/examples/low-level-api/reader-writer2.cc
similarity index 62%
copy from examples/low-level-api/reader-writer.cc
copy to examples/low-level-api/reader-writer2.cc
index fb2ec77..dded5fa 100644
--- a/examples/low-level-api/reader-writer.cc
+++ b/examples/low-level-api/reader-writer2.cc
@@ -18,19 +18,16 @@
 #include <cassert>
 #include <fstream>
 #include <iostream>
-#include <list>
 #include <memory>
 
-#include <arrow/io/file.h>
-#include <arrow/util/logging.h>
-
-#include <parquet/api/reader.h>
-#include <parquet/api/writer.h>
+#include <reader_writer.h>
 
 /*
  * This example describes writing and reading Parquet Files in C++ and serves as a
  * reference to the API.
  * The file contains all the physical data types supported by Parquet.
+ * This example uses the RowGroupWriter API that supports writing RowGroups based on a
+ *certain size
  **/
 
 /* Parquet is a structured columnar file format
@@ -45,56 +42,9 @@
  * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md
  **/
 
-constexpr int NUM_ROWS_PER_ROW_GROUP = 500;
-constexpr int FIXED_LENGTH = 10;
-const char PARQUET_FILENAME[] = "parquet_cpp_example.parquet";
-
-using parquet::Repetition;
-using parquet::Type;
-using parquet::LogicalType;
-using parquet::schema::PrimitiveNode;
-using parquet::schema::GroupNode;
-
-static std::shared_ptr<GroupNode> SetupSchema() {
-  parquet::schema::NodeVector fields;
-  // Create a primitive node named 'boolean_field' with type:BOOLEAN,
-  // repetition:REQUIRED
-  fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED,
-                                       Type::BOOLEAN, LogicalType::NONE));
-
-  // Create a primitive node named 'int32_field' with type:INT32, repetition:REQUIRED,
-  // logical type:TIME_MILLIS
-  fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED, Type::INT32,
-                                       LogicalType::TIME_MILLIS));
-
-  // Create a primitive node named 'int64_field' with type:INT64, repetition:REPEATED
-  fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED, Type::INT64,
-                                       LogicalType::NONE));
-
-  fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED, Type::INT96,
-                                       LogicalType::NONE));
-
-  fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED, Type::FLOAT,
-                                       LogicalType::NONE));
-
-  fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE,
-                                       LogicalType::NONE));
-
-  // Create a primitive node named 'ba_field' with type:BYTE_ARRAY, repetition:OPTIONAL
-  fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL, Type::BYTE_ARRAY,
-                                       LogicalType::NONE));
-
-  // Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY,
-  // repetition:REQUIRED, field_length = FIXED_LENGTH
-  fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED,
-                                       Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE,
-                                       FIXED_LENGTH));
-
-  // Create a GroupNode named 'schema' using the primitive nodes defined above
-  // This GroupNode is the root node of the schema tree
-  return std::static_pointer_cast<GroupNode>(
-      GroupNode::Make("schema", Repetition::REQUIRED, fields));
-}
+constexpr int NUM_ROWS = 2500000;
+constexpr int64_t ROW_GROUP_SIZE = 16 * 1024 * 1024;  // 16 MB
+const char PARQUET_FILENAME[] = "parquet_cpp_example2.parquet";
 
 int main(int argc, char** argv) {
   /**********************************************************************************
@@ -121,99 +71,118 @@ int main(int argc, char** argv) {
     std::shared_ptr<parquet::ParquetFileWriter> file_writer =
         parquet::ParquetFileWriter::Open(out_file, schema, props);
 
-    // Append a RowGroup with a specific number of rows.
-    parquet::RowGroupWriter* rg_writer =
-        file_writer->AppendRowGroup(NUM_ROWS_PER_ROW_GROUP);
+    // Append a BufferedRowGroup to keep the RowGroup open until a certain size
+    parquet::RowGroupWriter* rg_writer = file_writer->AppendBufferedRowGroup();
 
-    // Write the Bool column
-    parquet::BoolWriter* bool_writer =
-        static_cast<parquet::BoolWriter*>(rg_writer->NextColumn());
-    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
-      bool value = ((i % 2) == 0) ? true : false;
-      bool_writer->WriteBatch(1, nullptr, nullptr, &value);
-    }
+    int num_columns = file_writer->num_columns();
+    std::vector<int64_t> buffered_values_estimate(num_columns, 0);
+    for (int i = 0; i < NUM_ROWS; i++) {
+      int64_t estimated_bytes = 0;
+      // Get the estimated size of the values that are not written to a page yet
+      for (int n = 0; n < num_columns; n++) {
+        estimated_bytes += buffered_values_estimate[n];
+      }
 
-    // Write the Int32 column
-    parquet::Int32Writer* int32_writer =
-        static_cast<parquet::Int32Writer*>(rg_writer->NextColumn());
-    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
-      int32_t value = i;
-      int32_writer->WriteBatch(1, nullptr, nullptr, &value);
-    }
+      // We need to consider the compressed pages
+      // as well as the values that are not compressed yet
+      if ((rg_writer->total_bytes_written() + rg_writer->total_compressed_bytes() +
+           estimated_bytes) > ROW_GROUP_SIZE) {
+        rg_writer->Close();
+        std::fill(buffered_values_estimate.begin(), buffered_values_estimate.end(), 0);
+        rg_writer = file_writer->AppendBufferedRowGroup();
+      }
 
-    // Write the Int64 column. Each row has repeats twice.
-    parquet::Int64Writer* int64_writer =
-        static_cast<parquet::Int64Writer*>(rg_writer->NextColumn());
-    for (int i = 0; i < 2 * NUM_ROWS_PER_ROW_GROUP; i++) {
-      int64_t value = i * 1000 * 1000;
-      value *= 1000 * 1000;
+      int col_id = 0;
+      // Write the Bool column
+      parquet::BoolWriter* bool_writer =
+          static_cast<parquet::BoolWriter*>(rg_writer->column(col_id));
+      bool bool_value = ((i % 2) == 0) ? true : false;
+      bool_writer->WriteBatch(1, nullptr, nullptr, &bool_value);
+      buffered_values_estimate[col_id] = bool_writer->EstimatedBufferedValueBytes();
+
+      // Write the Int32 column
+      col_id++;
+      parquet::Int32Writer* int32_writer =
+          static_cast<parquet::Int32Writer*>(rg_writer->column(col_id));
+      int32_t int32_value = i;
+      int32_writer->WriteBatch(1, nullptr, nullptr, &int32_value);
+      buffered_values_estimate[col_id] = int32_writer->EstimatedBufferedValueBytes();
+
+      // Write the Int64 column. Each row has repeats twice.
+      col_id++;
+      parquet::Int64Writer* int64_writer =
+          static_cast<parquet::Int64Writer*>(rg_writer->column(col_id));
+      int64_t int64_value1 = 2 * i;
       int16_t definition_level = 1;
       int16_t repetition_level = 0;
-      if ((i % 2) == 0) {
-        repetition_level = 1;  // start of a new record
-      }
-      int64_writer->WriteBatch(1, &definition_level, &repetition_level, &value);
-    }
-
-    // Write the INT96 column.
-    parquet::Int96Writer* int96_writer =
-        static_cast<parquet::Int96Writer*>(rg_writer->NextColumn());
-    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
-      parquet::Int96 value;
-      value.value[0] = i;
-      value.value[1] = i + 1;
-      value.value[2] = i + 2;
-      int96_writer->WriteBatch(1, nullptr, nullptr, &value);
-    }
-
-    // Write the Float column
-    parquet::FloatWriter* float_writer =
-        static_cast<parquet::FloatWriter*>(rg_writer->NextColumn());
-    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
-      float value = static_cast<float>(i) * 1.1f;
-      float_writer->WriteBatch(1, nullptr, nullptr, &value);
-    }
-
-    // Write the Double column
-    parquet::DoubleWriter* double_writer =
-        static_cast<parquet::DoubleWriter*>(rg_writer->NextColumn());
-    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
-      double value = i * 1.1111111;
-      double_writer->WriteBatch(1, nullptr, nullptr, &value);
-    }
-
-    // Write the ByteArray column. Make every alternate values NULL
-    parquet::ByteArrayWriter* ba_writer =
-        static_cast<parquet::ByteArrayWriter*>(rg_writer->NextColumn());
-    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
-      parquet::ByteArray value;
+      int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value1);
+      int64_t int64_value2 = (2 * i + 1);
+      repetition_level = 1;  // start of a new record
+      int64_writer->WriteBatch(1, &definition_level, &repetition_level, &int64_value2);
+      buffered_values_estimate[col_id] = int64_writer->EstimatedBufferedValueBytes();
+
+      // Write the INT96 column.
+      col_id++;
+      parquet::Int96Writer* int96_writer =
+          static_cast<parquet::Int96Writer*>(rg_writer->column(col_id));
+      parquet::Int96 int96_value;
+      int96_value.value[0] = i;
+      int96_value.value[1] = i + 1;
+      int96_value.value[2] = i + 2;
+      int96_writer->WriteBatch(1, nullptr, nullptr, &int96_value);
+      buffered_values_estimate[col_id] = int96_writer->EstimatedBufferedValueBytes();
+
+      // Write the Float column
+      col_id++;
+      parquet::FloatWriter* float_writer =
+          static_cast<parquet::FloatWriter*>(rg_writer->column(col_id));
+      float float_value = static_cast<float>(i) * 1.1f;
+      float_writer->WriteBatch(1, nullptr, nullptr, &float_value);
+      buffered_values_estimate[col_id] = float_writer->EstimatedBufferedValueBytes();
+
+      // Write the Double column
+      col_id++;
+      parquet::DoubleWriter* double_writer =
+          static_cast<parquet::DoubleWriter*>(rg_writer->column(col_id));
+      double double_value = i * 1.1111111;
+      double_writer->WriteBatch(1, nullptr, nullptr, &double_value);
+      buffered_values_estimate[col_id] = double_writer->EstimatedBufferedValueBytes();
+
+      // Write the ByteArray column. Make every alternate values NULL
+      col_id++;
+      parquet::ByteArrayWriter* ba_writer =
+          static_cast<parquet::ByteArrayWriter*>(rg_writer->column(col_id));
+      parquet::ByteArray ba_value;
       char hello[FIXED_LENGTH] = "parquet";
       hello[7] = static_cast<char>(static_cast<int>('0') + i / 100);
       hello[8] = static_cast<char>(static_cast<int>('0') + (i / 10) % 10);
       hello[9] = static_cast<char>(static_cast<int>('0') + i % 10);
       if (i % 2 == 0) {
         int16_t definition_level = 1;
-        value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
-        value.len = FIXED_LENGTH;
-        ba_writer->WriteBatch(1, &definition_level, nullptr, &value);
+        ba_value.ptr = reinterpret_cast<const uint8_t*>(&hello[0]);
+        ba_value.len = FIXED_LENGTH;
+        ba_writer->WriteBatch(1, &definition_level, nullptr, &ba_value);
       } else {
         int16_t definition_level = 0;
         ba_writer->WriteBatch(1, &definition_level, nullptr, nullptr);
       }
-    }
+      buffered_values_estimate[col_id] = ba_writer->EstimatedBufferedValueBytes();
 
-    // Write the FixedLengthByteArray column
-    parquet::FixedLenByteArrayWriter* flba_writer =
-        static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->NextColumn());
-    for (int i = 0; i < NUM_ROWS_PER_ROW_GROUP; i++) {
-      parquet::FixedLenByteArray value;
+      // Write the FixedLengthByteArray column
+      col_id++;
+      parquet::FixedLenByteArrayWriter* flba_writer =
+          static_cast<parquet::FixedLenByteArrayWriter*>(rg_writer->column(col_id));
+      parquet::FixedLenByteArray flba_value;
       char v = static_cast<char>(i);
       char flba[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
-      value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
+      flba_value.ptr = reinterpret_cast<const uint8_t*>(&flba[0]);
 
-      flba_writer->WriteBatch(1, nullptr, nullptr, &value);
+      flba_writer->WriteBatch(1, nullptr, nullptr, &flba_value);
+      buffered_values_estimate[col_id] = flba_writer->EstimatedBufferedValueBytes();
     }
 
+    // Close the RowGroupWriter
+    rg_writer->Close();
     // Close the ParquetFileWriter
     file_writer->Close();
 
@@ -236,34 +205,35 @@ int main(int argc, char** argv) {
     // Get the File MetaData
     std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
 
-    // Get the number of RowGroups
     int num_row_groups = file_metadata->num_row_groups();
-    assert(num_row_groups == 1);
 
     // Get the number of Columns
     int num_columns = file_metadata->num_columns();
     assert(num_columns == 8);
 
+    std::vector<int> col_row_counts(num_columns, 0);
+
     // Iterate over all the RowGroups in the file
     for (int r = 0; r < num_row_groups; ++r) {
       // Get the RowGroup Reader
       std::shared_ptr<parquet::RowGroupReader> row_group_reader =
           parquet_reader->RowGroup(r);
 
+      assert(row_group_reader->metadata()->total_byte_size() < ROW_GROUP_SIZE);
+
       int64_t values_read = 0;
       int64_t rows_read = 0;
       int16_t definition_level;
       int16_t repetition_level;
-      int i;
       std::shared_ptr<parquet::ColumnReader> column_reader;
+      int col_id = 0;
 
       // Get the Column Reader for the boolean column
-      column_reader = row_group_reader->Column(0);
+      column_reader = row_group_reader->Column(col_id);
       parquet::BoolReader* bool_reader =
           static_cast<parquet::BoolReader*>(column_reader.get());
 
       // Read all the rows in the column
-      i = 0;
       while (bool_reader->HasNext()) {
         bool value;
         // Read one value at a time. The number of rows read is returned. values_read
@@ -274,17 +244,17 @@ int main(int argc, char** argv) {
         // There are no NULL values in the rows written
         assert(values_read == 1);
         // Verify the value written
-        bool expected_value = ((i % 2) == 0) ? true : false;
+        bool expected_value = ((col_row_counts[col_id] % 2) == 0) ? true : false;
         assert(value == expected_value);
-        i++;
+        col_row_counts[col_id]++;
       }
 
       // Get the Column Reader for the Int32 column
-      column_reader = row_group_reader->Column(1);
+      col_id++;
+      column_reader = row_group_reader->Column(col_id);
       parquet::Int32Reader* int32_reader =
           static_cast<parquet::Int32Reader*>(column_reader.get());
       // Read all the rows in the column
-      i = 0;
       while (int32_reader->HasNext()) {
         int32_t value;
         // Read one value at a time. The number of rows read is returned. values_read
@@ -295,16 +265,16 @@ int main(int argc, char** argv) {
         // There are no NULL values in the rows written
         assert(values_read == 1);
         // Verify the value written
-        assert(value == i);
-        i++;
+        assert(value == col_row_counts[col_id]);
+        col_row_counts[col_id]++;
       }
 
       // Get the Column Reader for the Int64 column
-      column_reader = row_group_reader->Column(2);
+      col_id++;
+      column_reader = row_group_reader->Column(col_id);
       parquet::Int64Reader* int64_reader =
           static_cast<parquet::Int64Reader*>(column_reader.get());
       // Read all the rows in the column
-      i = 0;
       while (int64_reader->HasNext()) {
         int64_t value;
         // Read one value at a time. The number of rows read is returned. values_read
@@ -316,23 +286,22 @@ int main(int argc, char** argv) {
         // There are no NULL values in the rows written
         assert(values_read == 1);
         // Verify the value written
-        int64_t expected_value = i * 1000 * 1000;
-        expected_value *= 1000 * 1000;
+        int64_t expected_value = col_row_counts[col_id];
         assert(value == expected_value);
-        if ((i % 2) == 0) {
-          assert(repetition_level == 1);
-        } else {
+        if ((col_row_counts[col_id] % 2) == 0) {
           assert(repetition_level == 0);
+        } else {
+          assert(repetition_level == 1);
         }
-        i++;
+        col_row_counts[col_id]++;
       }
 
       // Get the Column Reader for the Int96 column
-      column_reader = row_group_reader->Column(3);
+      col_id++;
+      column_reader = row_group_reader->Column(col_id);
       parquet::Int96Reader* int96_reader =
           static_cast<parquet::Int96Reader*>(column_reader.get());
       // Read all the rows in the column
-      i = 0;
       while (int96_reader->HasNext()) {
         parquet::Int96 value;
         // Read one value at a time. The number of rows read is returned. values_read
@@ -344,21 +313,21 @@ int main(int argc, char** argv) {
         assert(values_read == 1);
         // Verify the value written
         parquet::Int96 expected_value;
-        expected_value.value[0] = i;
-        expected_value.value[1] = i + 1;
-        expected_value.value[2] = i + 2;
+        expected_value.value[0] = col_row_counts[col_id];
+        expected_value.value[1] = col_row_counts[col_id] + 1;
+        expected_value.value[2] = col_row_counts[col_id] + 2;
         for (int j = 0; j < 3; j++) {
           assert(value.value[j] == expected_value.value[j]);
         }
-        i++;
+        col_row_counts[col_id]++;
       }
 
       // Get the Column Reader for the Float column
-      column_reader = row_group_reader->Column(4);
+      col_id++;
+      column_reader = row_group_reader->Column(col_id);
       parquet::FloatReader* float_reader =
           static_cast<parquet::FloatReader*>(column_reader.get());
       // Read all the rows in the column
-      i = 0;
       while (float_reader->HasNext()) {
         float value;
         // Read one value at a time. The number of rows read is returned. values_read
@@ -369,17 +338,17 @@ int main(int argc, char** argv) {
         // There are no NULL values in the rows written
         assert(values_read == 1);
         // Verify the value written
-        float expected_value = static_cast<float>(i) * 1.1f;
+        float expected_value = static_cast<float>(col_row_counts[col_id]) * 1.1f;
         assert(value == expected_value);
-        i++;
+        col_row_counts[col_id]++;
       }
 
       // Get the Column Reader for the Double column
-      column_reader = row_group_reader->Column(5);
+      col_id++;
+      column_reader = row_group_reader->Column(col_id);
       parquet::DoubleReader* double_reader =
           static_cast<parquet::DoubleReader*>(column_reader.get());
       // Read all the rows in the column
-      i = 0;
       while (double_reader->HasNext()) {
         double value;
         // Read one value at a time. The number of rows read is returned. values_read
@@ -390,17 +359,17 @@ int main(int argc, char** argv) {
         // There are no NULL values in the rows written
         assert(values_read == 1);
         // Verify the value written
-        double expected_value = i * 1.1111111;
+        double expected_value = col_row_counts[col_id] * 1.1111111;
         assert(value == expected_value);
-        i++;
+        col_row_counts[col_id]++;
       }
 
       // Get the Column Reader for the ByteArray column
-      column_reader = row_group_reader->Column(6);
+      col_id++;
+      column_reader = row_group_reader->Column(col_id);
       parquet::ByteArrayReader* ba_reader =
           static_cast<parquet::ByteArrayReader*>(column_reader.get());
       // Read all the rows in the column
-      i = 0;
       while (ba_reader->HasNext()) {
         parquet::ByteArray value;
         // Read one value at a time. The number of rows read is returned. values_read
@@ -411,10 +380,10 @@ int main(int argc, char** argv) {
         assert(rows_read == 1);
         // Verify the value written
         char expected_value[FIXED_LENGTH] = "parquet";
-        expected_value[7] = static_cast<char>('0' + i / 100);
-        expected_value[8] = static_cast<char>('0' + (i / 10) % 10);
-        expected_value[9] = static_cast<char>('0' + i % 10);
-        if (i % 2 == 0) {  // only alternate values exist
+        expected_value[7] = static_cast<char>('0' + col_row_counts[col_id] / 100);
+        expected_value[8] = static_cast<char>('0' + (col_row_counts[col_id] / 10) % 10);
+        expected_value[9] = static_cast<char>('0' + col_row_counts[col_id] % 10);
+        if (col_row_counts[col_id] % 2 == 0) {  // only alternate values exist
           // There are no NULL values in the rows written
           assert(values_read == 1);
           assert(value.len == FIXED_LENGTH);
@@ -425,15 +394,15 @@ int main(int argc, char** argv) {
           assert(values_read == 0);
           assert(definition_level == 0);
         }
-        i++;
+        col_row_counts[col_id]++;
       }
 
       // Get the Column Reader for the FixedLengthByteArray column
-      column_reader = row_group_reader->Column(7);
+      col_id++;
+      column_reader = row_group_reader->Column(col_id);
       parquet::FixedLenByteArrayReader* flba_reader =
           static_cast<parquet::FixedLenByteArrayReader*>(column_reader.get());
       // Read all the rows in the column
-      i = 0;
       while (flba_reader->HasNext()) {
         parquet::FixedLenByteArray value;
         // Read one value at a time. The number of rows read is returned. values_read
@@ -444,10 +413,10 @@ int main(int argc, char** argv) {
         // There are no NULL values in the rows written
         assert(values_read == 1);
         // Verify the value written
-        char v = static_cast<char>(i);
+        char v = static_cast<char>(col_row_counts[col_id]);
         char expected_value[FIXED_LENGTH] = {v, v, v, v, v, v, v, v, v, v};
         assert(memcmp(value.ptr, &expected_value[0], FIXED_LENGTH) == 0);
-        i++;
+        col_row_counts[col_id]++;
       }
     }
   } catch (const std::exception& e) {
diff --git a/examples/low-level-api/reader_writer.h b/examples/low-level-api/reader_writer.h
new file mode 100644
index 0000000..3fda0cf
--- /dev/null
+++ b/examples/low-level-api/reader_writer.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <arrow/io/file.h>
+#include <arrow/util/logging.h>
+
+#include <parquet/api/reader.h>
+#include <parquet/api/writer.h>
+
+using parquet::LogicalType;
+using parquet::Repetition;
+using parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::PrimitiveNode;
+
+constexpr int FIXED_LENGTH = 10;
+
+static std::shared_ptr<GroupNode> SetupSchema() {
+  parquet::schema::NodeVector fields;
+  // Create a primitive node named 'boolean_field' with type:BOOLEAN,
+  // repetition:REQUIRED
+  fields.push_back(PrimitiveNode::Make("boolean_field", Repetition::REQUIRED,
+                                       Type::BOOLEAN, LogicalType::NONE));
+
+  // Create a primitive node named 'int32_field' with type:INT32, repetition:REQUIRED,
+  // logical type:TIME_MILLIS
+  fields.push_back(PrimitiveNode::Make("int32_field", Repetition::REQUIRED, Type::INT32,
+                                       LogicalType::TIME_MILLIS));
+
+  // Create a primitive node named 'int64_field' with type:INT64, repetition:REPEATED
+  fields.push_back(PrimitiveNode::Make("int64_field", Repetition::REPEATED, Type::INT64,
+                                       LogicalType::NONE));
+
+  fields.push_back(PrimitiveNode::Make("int96_field", Repetition::REQUIRED, Type::INT96,
+                                       LogicalType::NONE));
+
+  fields.push_back(PrimitiveNode::Make("float_field", Repetition::REQUIRED, Type::FLOAT,
+                                       LogicalType::NONE));
+
+  fields.push_back(PrimitiveNode::Make("double_field", Repetition::REQUIRED, Type::DOUBLE,
+                                       LogicalType::NONE));
+
+  // Create a primitive node named 'ba_field' with type:BYTE_ARRAY, repetition:OPTIONAL
+  fields.push_back(PrimitiveNode::Make("ba_field", Repetition::OPTIONAL, Type::BYTE_ARRAY,
+                                       LogicalType::NONE));
+
+  // Create a primitive node named 'flba_field' with type:FIXED_LEN_BYTE_ARRAY,
+  // repetition:REQUIRED, field_length = FIXED_LENGTH
+  fields.push_back(PrimitiveNode::Make("flba_field", Repetition::REQUIRED,
+                                       Type::FIXED_LEN_BYTE_ARRAY, LogicalType::NONE,
+                                       FIXED_LENGTH));
+
+  // Create a GroupNode named 'schema' using the primitive nodes defined above
+  // This GroupNode is the root node of the schema tree
+  return std::static_pointer_cast<GroupNode>(
+      GroupNode::Make("schema", Repetition::REQUIRED, fields));
+}
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 19837db..631bb71 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -79,8 +79,7 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type NonNullA
     size_t size, std::shared_ptr<Array>* out) {
   using c_type = typename ArrowType::c_type;
   std::vector<c_type> values;
-  ::arrow::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1),
-                             &values);
+  ::arrow::random_real(size, 0, static_cast<c_type>(0), static_cast<c_type>(1), &values);
   ::arrow::NumericBuilder<ArrowType> builder;
   RETURN_NOT_OK(builder.AppendValues(values.data(), values.size()));
   return builder.Finish(out);
@@ -200,8 +199,8 @@ typename std::enable_if<is_arrow_float<ArrowType>::value, Status>::type Nullable
     size_t size, size_t num_nulls, uint32_t seed, std::shared_ptr<Array>* out) {
   using c_type = typename ArrowType::c_type;
   std::vector<c_type> values;
-  ::arrow::random_real(size, seed, static_cast<c_type>(-1e10),
-                             static_cast<c_type>(1e10), &values);
+  ::arrow::random_real(size, seed, static_cast<c_type>(-1e10), static_cast<c_type>(1e10),
+                       &values);
   std::vector<uint8_t> valid_bytes(size, 1);
 
   for (size_t i = 0; i < num_nulls; i++) {
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index dd89281..e87d549 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -47,7 +47,6 @@ const int LARGE_SIZE = 100000;
 const int VERY_LARGE_SIZE = 400000;
 #endif
 
-
 template <typename TestType>
 class TestPrimitiveWriter : public PrimitiveTypedTest<TestType> {
  public:
diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc
index 934530c..a65bda8 100644
--- a/src/parquet/column_writer.cc
+++ b/src/parquet/column_writer.cc
@@ -249,6 +249,16 @@ class SerializedPageWriter : public PageWriter {
 
   bool has_compressor() override { return (compressor_ != nullptr); }
 
+  int64_t num_values() { return num_values_; }
+
+  int64_t dictionary_page_offset() { return dictionary_page_offset_; }
+
+  int64_t data_page_offset() { return data_page_offset_; }
+
+  int64_t total_compressed_size() { return total_compressed_size_; }
+
+  int64_t total_uncompressed_size() { return total_uncompressed_size_; }
+
  private:
   OutputStream* sink_;
   ColumnChunkMetaDataBuilder* metadata_;
@@ -263,11 +273,64 @@ class SerializedPageWriter : public PageWriter {
   std::unique_ptr<::arrow::Codec> compressor_;
 };
 
+// This implementation of the PageWriter writes to the final sink on Close .
+class BufferedPageWriter : public PageWriter {
+ public:
+  BufferedPageWriter(OutputStream* sink, Compression::type codec,
+                     ColumnChunkMetaDataBuilder* metadata,
+                     ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : final_sink_(sink),
+        metadata_(metadata),
+        in_memory_sink_(new InMemoryOutputStream(pool)),
+        pager_(new SerializedPageWriter(in_memory_sink_.get(), codec, metadata, pool)) {}
+
+  int64_t WriteDictionaryPage(const DictionaryPage& page) override {
+    return pager_->WriteDictionaryPage(page);
+  }
+
+  void Close(bool has_dictionary, bool fallback) override {
+    // index_page_offset = -1 since they are not supported
+    metadata_->Finish(
+        pager_->num_values(), pager_->dictionary_page_offset() + final_sink_->Tell(), -1,
+        pager_->data_page_offset() + final_sink_->Tell(), pager_->total_compressed_size(),
+        pager_->total_uncompressed_size(), has_dictionary, fallback);
+
+    // Write metadata at end of column chunk
+    metadata_->WriteTo(in_memory_sink_.get());
+
+    // flush everything to the serialized sink
+    auto buffer = in_memory_sink_->GetBuffer();
+    final_sink_->Write(buffer->data(), buffer->size());
+  }
+
+  int64_t WriteDataPage(const CompressedDataPage& page) override {
+    return pager_->WriteDataPage(page);
+  }
+
+  void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override {
+    pager_->Compress(src_buffer, dest_buffer);
+  }
+
+  bool has_compressor() override { return pager_->has_compressor(); }
+
+ private:
+  OutputStream* final_sink_;
+  ColumnChunkMetaDataBuilder* metadata_;
+  std::unique_ptr<InMemoryOutputStream> in_memory_sink_;
+  std::unique_ptr<SerializedPageWriter> pager_;
+};
+
 std::unique_ptr<PageWriter> PageWriter::Open(OutputStream* sink, Compression::type codec,
                                              ColumnChunkMetaDataBuilder* metadata,
-                                             ::arrow::MemoryPool* pool) {
-  return std::unique_ptr<PageWriter>(
-      new SerializedPageWriter(sink, codec, metadata, pool));
+                                             ::arrow::MemoryPool* pool,
+                                             bool buffered_row_group) {
+  if (buffered_row_group) {
+    return std::unique_ptr<PageWriter>(
+        new BufferedPageWriter(sink, codec, metadata, pool));
+  } else {
+    return std::unique_ptr<PageWriter>(
+        new SerializedPageWriter(sink, codec, metadata, pool));
+  }
 }
 
 // ----------------------------------------------------------------------
@@ -294,6 +357,7 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
       num_buffered_encoded_values_(0),
       rows_written_(0),
       total_bytes_written_(0),
+      total_compressed_bytes_(0),
       closed_(false),
       fallback_(false) {
   definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
@@ -404,6 +468,7 @@ void ColumnWriter::AddDataPage() {
     CompressedDataPage page(compressed_data_copy,
                             static_cast<int32_t>(num_buffered_values_), encoding_,
                             Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
+    total_compressed_bytes_ += page.size() + sizeof(format::PageHeader);
     data_pages_.push_back(std::move(page));
   } else {  // Eagerly write pages
     CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
@@ -432,7 +497,7 @@ int64_t ColumnWriter::Close() {
     FlushBufferedDataPages();
 
     EncodedStatistics chunk_statistics = GetChunkStatistics();
-    // Write stats only if the column has atleast one row written
+    // Write stats only if the column has at least one row written
     // From parquet-mr
     // Don't write stats larger than the max size rather than truncating. The
     // rationale is that some engines may use the minimum value in the page as
@@ -459,6 +524,7 @@ void ColumnWriter::FlushBufferedDataPages() {
     WriteDataPage(data_pages_[i]);
   }
   data_pages_.clear();
+  total_compressed_bytes_ = 0;
 }
 
 // ----------------------------------------------------------------------
diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h
index 6b84748..1ba428a 100644
--- a/src/parquet/column_writer.h
+++ b/src/parquet/column_writer.h
@@ -75,7 +75,8 @@ class PageWriter {
 
   static std::unique_ptr<PageWriter> Open(
       OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata,
-      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool(),
+      bool buffered_row_group = false);
 
   // The Column Writer decides if dictionary encoding is used if set and
   // if the dictionary encoding has fallen back to default encoding on reaching dictionary
@@ -117,6 +118,12 @@ class PARQUET_EXPORT ColumnWriter {
 
   int64_t rows_written() const { return rows_written_; }
 
+  // Only considers the size of the compressed pages + page header
+  // Some values might be still buffered an not written to a page yet
+  int64_t total_compressed_bytes() const { return total_compressed_bytes_; }
+
+  int64_t total_bytes_written() const { return total_bytes_written_; }
+
   const WriterProperties* properties() { return properties_; }
 
  protected:
@@ -192,6 +199,9 @@ class PARQUET_EXPORT ColumnWriter {
   // Records the total number of bytes written by the serializer
   int64_t total_bytes_written_;
 
+  // Records the current number of compressed bytes in a column
+  int64_t total_compressed_bytes_;
+
   // Flag to check if the Writer has been closed
   bool closed_;
 
@@ -258,6 +268,11 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
                         const int16_t* rep_levels, const uint8_t* valid_bits,
                         int64_t valid_bits_offset, const T* values);
 
+  // Estimated size of the values that are not written to a page yet
+  int64_t EstimatedBufferedValueBytes() const {
+    return current_encoder_->EstimatedDataEncodedSize();
+  }
+
  protected:
   std::shared_ptr<Buffer> GetValuesBuffer() override {
     return current_encoder_->FlushValues();
diff --git a/src/parquet/file-serialize-test.cc b/src/parquet/file-serialize-test.cc
index 1993404..750faa2 100644
--- a/src/parquet/file-serialize-test.cc
+++ b/src/parquet/file-serialize-test.cc
@@ -41,8 +41,9 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
 
   void SetUp() {
     num_columns_ = 4;
-    num_rowgroups_ = 2;
+    num_rowgroups_ = 4;
     rows_per_rowgroup_ = 50;
+    rows_per_batch_ = 10;
     this->SetUpSchema(Repetition::OPTIONAL, num_columns_);
   }
 
@@ -50,6 +51,7 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
   int num_columns_;
   int num_rowgroups_;
   int rows_per_rowgroup_;
+  int rows_per_batch_;
 
   void FileSerializeTest(Compression::type codec_type) {
     std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
@@ -63,20 +65,44 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
     std::shared_ptr<WriterProperties> writer_properties = prop_builder.build();
 
     auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
-    for (int rg = 0; rg < num_rowgroups_; ++rg) {
+    this->GenerateData(rows_per_rowgroup_);
+    for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
       RowGroupWriter* row_group_writer;
       row_group_writer = file_writer->AppendRowGroup();
-      this->GenerateData(rows_per_rowgroup_);
       for (int col = 0; col < num_columns_; ++col) {
         auto column_writer =
             static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
         column_writer->WriteBatch(rows_per_rowgroup_, this->def_levels_.data(), nullptr,
                                   this->values_ptr_);
         column_writer->Close();
+        // Ensure column() API which is specific to BufferedRowGroup cannot be called
+        ASSERT_THROW(row_group_writer->column(col), ParquetException);
       }
 
       row_group_writer->Close();
     }
+    // Write half BufferedRowGroups
+    for (int rg = 0; rg < num_rowgroups_ / 2; ++rg) {
+      RowGroupWriter* row_group_writer;
+      row_group_writer = file_writer->AppendBufferedRowGroup();
+      for (int batch = 0; batch < (rows_per_rowgroup_ / rows_per_batch_); ++batch) {
+        for (int col = 0; col < num_columns_; ++col) {
+          auto column_writer =
+              static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+          column_writer->WriteBatch(
+              rows_per_batch_, this->def_levels_.data() + (batch * rows_per_batch_),
+              nullptr, this->values_ptr_ + (batch * rows_per_batch_));
+          // Ensure NextColumn() API which is specific to RowGroup cannot be called
+          ASSERT_THROW(row_group_writer->NextColumn(), ParquetException);
+        }
+      }
+      for (int col = 0; col < num_columns_; ++col) {
+        auto column_writer =
+            static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+        column_writer->Close();
+      }
+      row_group_writer->Close();
+    }
     file_writer->Close();
 
     auto buffer = sink->GetBuffer();
@@ -137,6 +163,30 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
     file_writer->Close();
   }
 
+  void UnequalNumRowsBuffered(int64_t max_rows,
+                              const std::vector<int64_t> rows_per_column) {
+    std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+    auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+
+    std::shared_ptr<WriterProperties> props = WriterProperties::Builder().build();
+
+    auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
+
+    RowGroupWriter* row_group_writer;
+    row_group_writer = file_writer->AppendBufferedRowGroup();
+
+    this->GenerateData(max_rows);
+    for (int col = 0; col < num_columns_; ++col) {
+      auto column_writer =
+          static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+      column_writer->WriteBatch(rows_per_column[col], this->def_levels_.data(), nullptr,
+                                this->values_ptr_);
+      column_writer->Close();
+    }
+    row_group_writer->Close();
+    file_writer->Close();
+  }
+
   void RepeatedUnequalRows() {
     // Optional and repeated, so definition and repetition levels
     this->SetUpSchema(Repetition::REPEATED);
@@ -186,15 +236,23 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
     auto file_writer = ParquetFileWriter::Open(sink, gnode, props);
 
     RowGroupWriter* row_group_writer;
-    row_group_writer = file_writer->AppendRowGroup();
 
+    row_group_writer = file_writer->AppendRowGroup();
     for (int col = 0; col < num_columns_; ++col) {
       auto column_writer =
           static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
       column_writer->Close();
     }
+    row_group_writer->Close();
 
+    row_group_writer = file_writer->AppendBufferedRowGroup();
+    for (int col = 0; col < num_columns_; ++col) {
+      auto column_writer =
+          static_cast<TypedColumnWriter<TestType>*>(row_group_writer->column(col));
+      column_writer->Close();
+    }
     row_group_writer->Close();
+
     file_writer->Close();
   }
 };
@@ -212,11 +270,13 @@ TYPED_TEST(TestSerialize, SmallFileUncompressed) {
 TYPED_TEST(TestSerialize, TooFewRows) {
   std::vector<int64_t> num_rows = {100, 100, 100, 99};
   ASSERT_THROW(this->UnequalNumRows(100, num_rows), ParquetException);
+  ASSERT_THROW(this->UnequalNumRowsBuffered(100, num_rows), ParquetException);
 }
 
 TYPED_TEST(TestSerialize, TooManyRows) {
   std::vector<int64_t> num_rows = {100, 100, 100, 101};
   ASSERT_THROW(this->UnequalNumRows(101, num_rows), ParquetException);
+  ASSERT_THROW(this->UnequalNumRowsBuffered(101, num_rows), ParquetException);
 }
 
 TYPED_TEST(TestSerialize, ZeroRows) { ASSERT_NO_THROW(this->ZeroRowsRowGroup()); }
diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc
index 9b2d9b0..30673c5 100644
--- a/src/parquet/file_writer.cc
+++ b/src/parquet/file_writer.cc
@@ -17,6 +17,8 @@
 
 #include "parquet/file_writer.h"
 
+#include <vector>
+
 #include "parquet/column_writer.h"
 #include "parquet/schema-internal.h"
 #include "parquet/schema.h"
@@ -47,12 +49,28 @@ void RowGroupWriter::Close() {
 
 ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); }
 
+ColumnWriter* RowGroupWriter::column(int i) { return contents_->column(i); }
+
+int64_t RowGroupWriter::total_compressed_bytes() const {
+  return contents_->total_compressed_bytes();
+}
+
+int64_t RowGroupWriter::total_bytes_written() const {
+  return contents_->total_bytes_written();
+}
+
 int RowGroupWriter::current_column() { return contents_->current_column(); }
 
 int RowGroupWriter::num_columns() const { return contents_->num_columns(); }
 
 int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
 
+inline void ThrowRowsMisMatchError(int col, int64_t prev, int64_t curr) {
+  std::stringstream ss;
+  ss << "Column " << col << " had " << curr << " while previous column had " << prev;
+  throw ParquetException(ss.str());
+}
+
 // ----------------------------------------------------------------------
 // RowGroupSerializer
 
@@ -60,34 +78,45 @@ int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); }
 class RowGroupSerializer : public RowGroupWriter::Contents {
  public:
   RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata,
-                     const WriterProperties* properties)
+                     const WriterProperties* properties, bool buffered_row_group = false)
       : sink_(sink),
         metadata_(metadata),
         properties_(properties),
         total_bytes_written_(0),
         closed_(false),
         current_column_index_(0),
-        num_rows_(-1) {}
+        num_rows_(0),
+        buffered_row_group_(buffered_row_group) {
+    if (buffered_row_group) {
+      InitColumns();
+    } else {
+      column_writers_.push_back(nullptr);
+    }
+  }
 
   int num_columns() const override { return metadata_->num_columns(); }
 
   int64_t num_rows() const override {
-    if (current_column_writer_) {
-      CheckRowsWritten();
-    }
-    return num_rows_ < 0 ? 0 : num_rows_;
+    CheckRowsWritten();
+    // CheckRowsWritten ensures num_rows_ is set correctly
+    return num_rows_;
   }
 
   ColumnWriter* NextColumn() override {
-    if (current_column_writer_) {
+    if (buffered_row_group_) {
+      throw ParquetException(
+          "NextColumn() is not supported when a RowGroup is written by size");
+    }
+
+    if (column_writers_[0]) {
       CheckRowsWritten();
     }
 
     // Throws an error if more columns are being written
     auto col_meta = metadata_->NextColumnChunk();
 
-    if (current_column_writer_) {
-      total_bytes_written_ += current_column_writer_->Close();
+    if (column_writers_[0]) {
+      total_bytes_written_ += column_writers_[0]->Close();
     }
 
     ++current_column_index_;
@@ -96,23 +125,60 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
     std::unique_ptr<PageWriter> pager =
         PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta,
                          properties_->memory_pool());
-    current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_);
-    return current_column_writer_.get();
+    column_writers_[0] = ColumnWriter::Make(col_meta, std::move(pager), properties_);
+    return column_writers_[0].get();
+  }
+
+  ColumnWriter* column(int i) override {
+    if (!buffered_row_group_) {
+      throw ParquetException(
+          "column() is only supported when a BufferedRowGroup is being written");
+    }
+
+    if (i >= 0 && i < static_cast<int>(column_writers_.size())) {
+      return column_writers_[i].get();
+    }
+    return nullptr;
   }
 
   int current_column() const override { return metadata_->current_column(); }
 
+  int64_t total_compressed_bytes() const override {
+    int64_t total_compressed_bytes = 0;
+    for (size_t i = 0; i < column_writers_.size(); i++) {
+      if (column_writers_[i]) {
+        total_compressed_bytes += column_writers_[i]->total_compressed_bytes();
+      }
+    }
+    return total_compressed_bytes;
+  }
+
+  int64_t total_bytes_written() const override {
+    int64_t total_bytes_written = 0;
+    for (size_t i = 0; i < column_writers_.size(); i++) {
+      if (column_writers_[i]) {
+        total_bytes_written += column_writers_[i]->total_bytes_written();
+      }
+    }
+    return total_bytes_written;
+  }
+
   void Close() override {
     if (!closed_) {
       closed_ = true;
+      CheckRowsWritten();
 
-      if (current_column_writer_) {
-        CheckRowsWritten();
-        total_bytes_written_ += current_column_writer_->Close();
-        current_column_writer_.reset();
+      for (size_t i = 0; i < column_writers_.size(); i++) {
+        if (column_writers_[i]) {
+          total_bytes_written_ += column_writers_[i]->Close();
+          column_writers_[i].reset();
+        }
       }
 
+      column_writers_.clear();
+
       // Ensures all columns have been written
+      metadata_->set_num_rows(num_rows_);
       metadata_->Finish(total_bytes_written_);
     }
   }
@@ -125,21 +191,43 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
   bool closed_;
   int current_column_index_;
   mutable int64_t num_rows_;
+  bool buffered_row_group_;
 
   void CheckRowsWritten() const {
-    int64_t current_rows = current_column_writer_->rows_written();
-    if (num_rows_ < 0) {
-      num_rows_ = current_rows;
-      metadata_->set_num_rows(current_rows);
-    } else if (num_rows_ != current_rows) {
-      std::stringstream ss;
-      ss << "Column " << current_column_index_ << " had " << current_rows
-         << " while previous column had " << num_rows_;
-      throw ParquetException(ss.str());
+    // verify when only one column is written at a time
+    if (!buffered_row_group_ && column_writers_.size() > 0 && column_writers_[0]) {
+      int64_t current_col_rows = column_writers_[0]->rows_written();
+      if (num_rows_ == 0) {
+        num_rows_ = current_col_rows;
+      } else if (num_rows_ != current_col_rows) {
+        ThrowRowsMisMatchError(current_column_index_, current_col_rows, num_rows_);
+      }
+    } else if (buffered_row_group_ &&
+               column_writers_.size() > 0) {  // when buffered_row_group = true
+      int64_t current_col_rows = column_writers_[0]->rows_written();
+      for (int i = 1; i < static_cast<int>(column_writers_.size()); i++) {
+        int64_t current_col_rows_i = column_writers_[i]->rows_written();
+        if (current_col_rows != current_col_rows_i) {
+          ThrowRowsMisMatchError(i, current_col_rows_i, current_col_rows);
+        }
+      }
+      num_rows_ = current_col_rows;
+    }
+  }
+
+  void InitColumns() {
+    for (int i = 0; i < num_columns(); i++) {
+      auto col_meta = metadata_->NextColumnChunk();
+      const ColumnDescriptor* column_descr = col_meta->descr();
+      std::unique_ptr<PageWriter> pager =
+          PageWriter::Open(sink_, properties_->compression(column_descr->path()),
+                           col_meta, properties_->memory_pool(), buffered_row_group_);
+      column_writers_.push_back(
+          ColumnWriter::Make(col_meta, std::move(pager), properties_));
     }
   }
 
-  std::shared_ptr<ColumnWriter> current_column_writer_;
+  std::vector<std::shared_ptr<ColumnWriter>> column_writers_;
 };
 
 // ----------------------------------------------------------------------
@@ -187,18 +275,22 @@ class FileSerializer : public ParquetFileWriter::Contents {
     return properties_;
   }
 
-  RowGroupWriter* AppendRowGroup() override {
+  RowGroupWriter* AppendRowGroup(bool buffered_row_group) {
     if (row_group_writer_) {
       row_group_writer_->Close();
     }
     num_row_groups_++;
     auto rg_metadata = metadata_->AppendRowGroup();
-    std::unique_ptr<RowGroupWriter::Contents> contents(
-        new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get()));
+    std::unique_ptr<RowGroupWriter::Contents> contents(new RowGroupSerializer(
+        sink_.get(), rg_metadata, properties_.get(), buffered_row_group));
     row_group_writer_.reset(new RowGroupWriter(std::move(contents)));
     return row_group_writer_.get();
   }
 
+  RowGroupWriter* AppendRowGroup() override { return AppendRowGroup(false); }
+
+  RowGroupWriter* AppendBufferedRowGroup() override { return AppendRowGroup(true); }
+
   ~FileSerializer() override {
     try {
       Close();
@@ -227,6 +319,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
   int num_row_groups_;
   int64_t num_rows_;
   std::unique_ptr<FileMetaDataBuilder> metadata_;
+  // Only one of the row group writers is active at a time
   std::unique_ptr<RowGroupWriter> row_group_writer_;
 
   void StartFile() {
@@ -311,6 +404,10 @@ RowGroupWriter* ParquetFileWriter::AppendRowGroup() {
   return contents_->AppendRowGroup();
 }
 
+RowGroupWriter* ParquetFileWriter::AppendBufferedRowGroup() {
+  return contents_->AppendBufferedRowGroup();
+}
+
 RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
   return AppendRowGroup();
 }
diff --git a/src/parquet/file_writer.h b/src/parquet/file_writer.h
index de17982..cdfe06c 100644
--- a/src/parquet/file_writer.h
+++ b/src/parquet/file_writer.h
@@ -49,15 +49,25 @@ class PARQUET_EXPORT RowGroupWriter {
     virtual int num_columns() const = 0;
     virtual int64_t num_rows() const = 0;
 
+    // to be used only with ParquetFileWriter::AppendRowGroup
     virtual ColumnWriter* NextColumn() = 0;
+    // to be used only with ParquetFileWriter::AppendBufferedRowGroup
+    virtual ColumnWriter* column(int i) = 0;
+
     virtual int current_column() const = 0;
     virtual void Close() = 0;
+
+    // total bytes written by the page writer
+    virtual int64_t total_bytes_written() const = 0;
+    // total bytes still compressed but not written
+    virtual int64_t total_compressed_bytes() const = 0;
   };
 
   explicit RowGroupWriter(std::unique_ptr<Contents> contents);
 
   /// Construct a ColumnWriter for the indicated row group-relative column.
   ///
+  /// To be used only with ParquetFileWriter::AppendRowGroup
   /// Ownership is solely within the RowGroupWriter. The ColumnWriter is only
   /// valid until the next call to NextColumn or Close. As the contents are
   /// directly written to the sink, once a new column is started, the contents
@@ -69,11 +79,22 @@ class PARQUET_EXPORT RowGroupWriter {
 
   int num_columns() const;
 
+  /// Construct a ColumnWriter for the indicated row group column.
+  ///
+  /// To be used only with ParquetFileWriter::AppendBufferedRowGroup
+  /// Ownership is solely within the RowGroupWriter. The ColumnWriter is
+  /// valid until Close. The contents are buffered in memory and written to sink
+  /// on Close
+  ColumnWriter* column(int i);
+
   /**
    * Number of rows that shall be written as part of this RowGroup.
    */
   int64_t num_rows() const;
 
+  int64_t total_bytes_written() const;
+  int64_t total_compressed_bytes() const;
+
  private:
   // Holds a pointer to an instance of Contents implementation
   std::unique_ptr<Contents> contents_;
@@ -101,6 +122,7 @@ class PARQUET_EXPORT ParquetFileWriter {
     RowGroupWriter* AppendRowGroup(int64_t num_rows);
 
     virtual RowGroupWriter* AppendRowGroup() = 0;
+    virtual RowGroupWriter* AppendBufferedRowGroup() = 0;
 
     virtual int64_t num_rows() const = 0;
     virtual int num_columns() const = 0;
@@ -142,7 +164,7 @@ class PARQUET_EXPORT ParquetFileWriter {
   // Construct a RowGroupWriter for the indicated number of rows.
   //
   // Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
-  // until the next call to AppendRowGroup or Close.
+  // until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
   // @param num_rows The number of rows that are stored in the new RowGroup
   //
   // \deprecated Since 1.3.0
@@ -151,9 +173,16 @@ class PARQUET_EXPORT ParquetFileWriter {
   /// Construct a RowGroupWriter with an arbitrary number of rows.
   ///
   /// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
-  /// until the next call to AppendRowGroup or Close.
+  /// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
   RowGroupWriter* AppendRowGroup();
 
+  /// Construct a RowGroupWriter that buffers all the values until the RowGroup is ready.
+  /// Use this if you want to write a RowGroup based on a certain size
+  ///
+  /// Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
+  /// until the next call to AppendRowGroup or AppendBufferedRowGroup or Close.
+  RowGroupWriter* AppendBufferedRowGroup();
+
   /// Number of columns.
   ///
   /// This number is fixed during the lifetime of the writer as it is determined via


Mime
View raw message