parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [2/2] parquet-cpp git commit: PARQUET-1100: Introduce RecordReader interface to better support nested data, refactor parquet/arrow/reader
Date Wed, 20 Sep 2017 01:39:01 GMT
PARQUET-1100: Introduce RecordReader interface to better support nested data, refactor parquet/arrow/reader

We did not have very consistent logic around reading values from leaf nodes versus reading semantic records where the repetition level is greater than zero. This introduces a reader class that reads from column chunks until it identifies the end of records. It also reads values (with spaces, if required by the schema) into internal buffers. This permitted a substantial refactoring and simplification of the code in parquet::arrow where we were handling the interpretation of batch reads as records manually.

As follow up patch, we should be able to take a collection of record readers from the same "tree" in a nested type and reassemble the intermediate Arrow structure and dealing with any redundant structure information in repetition and definition levels. This should a allow a unification of our nested data read code path so that we can read arbitrary nested structures.

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #398 from wesm/PARQUET-1100 and squashes the following commits:

9ea85d9 [Wes McKinney] Revert to const args
f4dc0fe [Wes McKinney] Make parquet::schema::Node non-copyable. Use const-refs instead of const-ptr for non-nullable argument
0d859cc [Wes McKinney] Code review comments, scrubbing some flakes
1368415 [Wes McKinney] Fix more MSVC warnings
eccb84c [Wes McKinney] Give macro more accurate name
0eaada0 [Wes McKinney] Use int64_t instead of int for batch sizes
79c3709 [Wes McKinney] Add documentation. Remove RecordReader from public API
8fa619b [Wes McKinney] Initialize memory in DecodeSpaced to avoid undefined behavior
5a0c860 [Wes McKinney] Remove non-repeated branch from DelimitRecords
c754e6e [Wes McKinney] Refactor to skip record delimiting for non-repeated data
ed2a03f [Wes McKinney] Move more code into TypedRecordReader
2e934e9 [Wes McKinney] Set some integers as const
58d3a0f [Wes McKinney] Do not index into levels arrays
b766371 [Wes McKinney] Add RecordReader::Reserve to preallocate, fixing perf regression. cpplint
1bf3e8f [Wes McKinney] Refactor to create stateful parquet::RecordReader class to better support nested data. Shift value buffering logic from parquet/arrow/reader into RecordReader. Fix bug described in PARQUET-1100


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/4b09ac70
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/4b09ac70
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/4b09ac70

Branch: refs/heads/master
Commit: 4b09ac703bc75fee72f94bed8ecfe571096b04c1
Parents: 18ca392
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Tue Sep 19 21:38:52 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Tue Sep 19 21:38:52 2017 -0400

----------------------------------------------------------------------
 CMakeLists.txt                                  |    1 +
 .../arrow/arrow-reader-writer-benchmark.cc      |   36 +-
 src/parquet/arrow/arrow-reader-writer-test.cc   |  208 +++-
 src/parquet/arrow/reader.cc                     | 1048 ++++++------------
 src/parquet/arrow/reader.h                      |    8 +-
 src/parquet/arrow/record_reader.cc              |  807 ++++++++++++++
 src/parquet/arrow/record_reader.h               |  113 ++
 src/parquet/arrow/schema.cc                     |  140 ++-
 src/parquet/arrow/schema.h                      |   17 +-
 src/parquet/column_reader.cc                    |    5 +-
 src/parquet/column_reader.h                     |  175 +--
 src/parquet/column_writer-test.cc               |    2 +-
 src/parquet/encoding-internal.h                 |    4 +-
 src/parquet/encoding.h                          |    4 +
 src/parquet/file/metadata.h                     |    8 +-
 src/parquet/file/printer.cc                     |    2 +-
 src/parquet/file/reader-internal.cc             |    2 +-
 src/parquet/file/reader.cc                      |    7 +
 src/parquet/file/reader.h                       |    2 +
 src/parquet/schema-test.cc                      |   22 +-
 src/parquet/schema.cc                           |   12 +-
 src/parquet/schema.h                            |    9 +-
 src/parquet/statistics-test.cc                  |    5 +-
 src/parquet/test-util.h                         |    2 +-
 src/parquet/util/memory-test.cc                 |    4 +-
 src/parquet/util/memory.cc                      |   12 +-
 src/parquet/util/memory.h                       |    2 +-
 src/parquet/util/schema-util.h                  |    5 +-
 28 files changed, 1701 insertions(+), 961 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1deab40..ca37b5f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -628,6 +628,7 @@ set(LIBPARQUET_SRCS
   src/parquet/types.cc
 
   src/parquet/arrow/reader.cc
+  src/parquet/arrow/record_reader.cc
   src/parquet/arrow/schema.cc
   src/parquet/arrow/writer.cc
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/arrow-reader-writer-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index e899e10..a54fb5d 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -17,6 +17,8 @@
 
 #include "benchmark/benchmark.h"
 
+#include <iostream>
+
 #include "parquet/arrow/reader.h"
 #include "parquet/arrow/writer.h"
 #include "parquet/column_reader.h"
@@ -30,13 +32,14 @@
 using arrow::BooleanBuilder;
 using arrow::NumericBuilder;
 
-#define ABORT_NOT_OK(s)                  \
-  do {                                   \
-    ::arrow::Status _s = (s);            \
-    if (ARROW_PREDICT_FALSE(!_s.ok())) { \
-      exit(-1);                          \
-    }                                    \
-  } while (0);
+#define EXIT_NOT_OK(s)                                        \
+  do {                                                        \
+    ::arrow::Status _s = (s);                                 \
+    if (ARROW_PREDICT_FALSE(!_s.ok())) {                      \
+      std::cout << "Exiting: " << _s.ToString() << std::endl; \
+      exit(EXIT_FAILURE);                                     \
+    }                                                         \
+  } while (0)
 
 namespace parquet {
 
@@ -101,12 +104,12 @@ std::shared_ptr<::arrow::Table> TableFromVector(
     std::vector<uint8_t> valid_bytes(BENCHMARK_SIZE, 0);
     int n = {0};
     std::generate(valid_bytes.begin(), valid_bytes.end(), [&n] { return n++ % 2; });
-    ABORT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
+    EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
   } else {
-    ABORT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
+    EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
   }
   std::shared_ptr<::arrow::Array> array;
-  ABORT_NOT_OK(builder.Finish(&array));
+  EXIT_NOT_OK(builder.Finish(&array));
 
   auto field = ::arrow::field("column", type, nullable);
   auto schema = std::make_shared<::arrow::Schema>(
@@ -125,12 +128,12 @@ std::shared_ptr<::arrow::Table> TableFromVector<BooleanType>(const std::vector<b
     int n = {0};
     std::generate(valid_bytes.begin(), valid_bytes.end(),
                   [&n] { return (n++ % 2) != 0; });
-    ABORT_NOT_OK(builder.Append(vec, valid_bytes));
+    EXIT_NOT_OK(builder.Append(vec, valid_bytes));
   } else {
-    ABORT_NOT_OK(builder.Append(vec));
+    EXIT_NOT_OK(builder.Append(vec));
   }
   std::shared_ptr<::arrow::Array> array;
-  ABORT_NOT_OK(builder.Finish(&array));
+  EXIT_NOT_OK(builder.Finish(&array));
 
   auto field = ::arrow::field("column", ::arrow::boolean(), nullable);
   auto schema = std::make_shared<::arrow::Schema>(
@@ -148,7 +151,7 @@ static void BM_WriteColumn(::benchmark::State& state) {
 
   while (state.KeepRunning()) {
     auto output = std::make_shared<InMemoryOutputStream>();
-    ABORT_NOT_OK(
+    EXIT_NOT_OK(
         WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
   }
   SetBytesProcessed<nullable, ParquetType>(state);
@@ -171,8 +174,7 @@ static void BM_ReadColumn(::benchmark::State& state) {
   std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128);
   std::shared_ptr<::arrow::Table> table = TableFromVector<ParquetType>(values, nullable);
   auto output = std::make_shared<InMemoryOutputStream>();
-  ABORT_NOT_OK(
-      WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
+  EXIT_NOT_OK(WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
   std::shared_ptr<Buffer> buffer = output->GetBuffer();
 
   while (state.KeepRunning()) {
@@ -180,7 +182,7 @@ static void BM_ReadColumn(::benchmark::State& state) {
         ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
     FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
     std::shared_ptr<::arrow::Table> table;
-    ABORT_NOT_OK(filereader.ReadTable(&table));
+    EXIT_NOT_OK(filereader.ReadTable(&table));
   }
   SetBytesProcessed<nullable, ParquetType>(state);
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index a1e3382..56b4770 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -53,6 +53,9 @@ using arrow::TimeUnit;
 using arrow::default_memory_pool;
 using arrow::io::BufferReader;
 
+using arrow::test::randint;
+using arrow::test::random_is_valid;
+
 using ArrowId = ::arrow::Type;
 using ParquetType = parquet::Type;
 using parquet::schema::GroupNode;
@@ -366,6 +369,45 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType& type
   return std::static_pointer_cast<GroupNode>(node_);
 }
 
+void AssertArraysEqual(const Array& expected, const Array& actual) {
+  if (!actual.Equals(expected)) {
+    std::stringstream pp_result;
+    std::stringstream pp_expected;
+
+    EXPECT_OK(::arrow::PrettyPrint(actual, 0, &pp_result));
+    EXPECT_OK(::arrow::PrettyPrint(expected, 0, &pp_expected));
+    FAIL() << "Got: \n" << pp_result.str() << "\nExpected: \n" << pp_expected.str();
+  }
+}
+
+void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual) {
+  ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal";
+  if (!actual.Equals(expected)) {
+    std::stringstream pp_result;
+    std::stringstream pp_expected;
+
+    for (int i = 0; i < actual.num_chunks(); ++i) {
+      auto c1 = actual.chunk(i);
+      auto c2 = expected.chunk(i);
+      if (!c1->Equals(*c2)) {
+        EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result));
+        EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected));
+        FAIL() << "Chunk " << i << " Got: " << pp_result.str()
+               << "\nExpected: " << pp_expected.str();
+      }
+    }
+  }
+}
+
+void AssertTablesEqual(const Table& expected, const Table& actual) {
+  ASSERT_EQ(expected.num_columns(), actual.num_columns());
+
+  for (int i = 0; i < actual.num_columns(); ++i) {
+    AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data());
+  }
+  ASSERT_TRUE(actual.Equals(expected));
+}
+
 template <typename TestType>
 class TestParquetIO : public ::testing::Test {
  public:
@@ -394,13 +436,14 @@ class TestParquetIO : public ::testing::Test {
     ASSERT_NE(nullptr, out->get());
   }
 
-  void ReadAndCheckSingleColumnFile(::arrow::Array* values) {
-    std::shared_ptr<::arrow::Array> out;
+  void ReadAndCheckSingleColumnFile(const Array& values) {
+    std::shared_ptr<Array> out;
 
     std::unique_ptr<FileReader> reader;
     ReaderFromSink(&reader);
     ReadSingleColumnFile(std::move(reader), &out);
-    ASSERT_TRUE(values->Equals(out));
+
+    AssertArraysEqual(values, *out);
   }
 
   void ReadTableFromFile(std::unique_ptr<FileReader> reader,
@@ -440,7 +483,7 @@ class TestParquetIO : public ::testing::Test {
     *out = MakeSimpleTable(parent_lists, nullable_parent_lists);
   }
 
-  void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+  void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
     std::shared_ptr<::arrow::Table> out;
     std::unique_ptr<FileReader> reader;
     ReaderFromSink(&reader);
@@ -452,13 +495,14 @@ class TestParquetIO : public ::testing::Test {
     ASSERT_EQ(1, chunked_array->num_chunks());
     auto result = chunked_array->chunk(0);
 
-    ASSERT_TRUE(values->Equals(result));
+    AssertArraysEqual(*values, *result);
   }
 
   void CheckRoundTrip(const std::shared_ptr<Table>& table) {
     std::shared_ptr<Table> result;
     DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
-    ASSERT_TRUE(table->Equals(*result));
+
+    AssertTablesEqual(*table, *result);
   }
 
   template <typename ArrayType>
@@ -495,7 +539,7 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
       MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
   this->WriteColumn(schema, values);
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
@@ -515,7 +559,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
 
   std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
   ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+  AssertArraysEqual(*values, *chunked_array->chunk(0));
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
@@ -528,7 +573,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
       MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
   this->WriteColumn(schema, values);
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) {
@@ -547,7 +592,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) {
       MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
   this->WriteColumn(schema, dict_values);
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
@@ -558,12 +603,12 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
 
   std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
-  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+  this->ReadAndCheckSingleColumnFile(*sliced_values);
 
   // Slice offset 1 higher
   sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
-  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+  this->ReadAndCheckSingleColumnFile(*sliced_values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
@@ -574,12 +619,12 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
 
   std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
-  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+  this->ReadAndCheckSingleColumnFile(*sliced_values);
 
   // Slice offset 1 higher, thus different null bitmap.
   sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
   this->WriteColumn(schema, sliced_values);
-  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+  this->ReadAndCheckSingleColumnFile(*sliced_values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
@@ -636,7 +681,7 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
   }
   ASSERT_OK_NO_THROW(writer.Close());
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
@@ -679,7 +724,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
 
   std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
   ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+  AssertArraysEqual(*values, *chunked_array->chunk(0));
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
@@ -698,7 +744,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
   }
   ASSERT_OK_NO_THROW(writer.Close());
 
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
@@ -763,7 +809,7 @@ TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) {
   ASSERT_OK(builder.Append(val));
   std::shared_ptr<Array> values;
   ASSERT_OK(builder.Finish(&values));
-  this->ReadAndCheckSingleColumnFile(values.get());
+  this->ReadAndCheckSingleColumnFile(*values);
 }
 
 using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
@@ -850,7 +896,8 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
 
   std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
   ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+  AssertArraysEqual(*values, *chunked_array->chunk(0));
 }
 
 using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
@@ -871,7 +918,8 @@ TEST_F(TestNullParquetIO, NullColumn) {
 
   std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
   ASSERT_EQ(1, chunked_array->num_chunks());
-  ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+  AssertArraysEqual(*values, *chunked_array->chunk(0));
 }
 
 template <typename T>
@@ -1026,14 +1074,14 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
       table, 1, table->num_rows(), {}, &result,
       ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build());
 
-  ASSERT_TRUE(table->Equals(*result));
+  AssertTablesEqual(*table, *result);
 
   // Cast nanaoseconds to microseconds and use INT64 physical type
   DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
   std::shared_ptr<Table> expected;
   MakeDateTimeTypesTable(&table, true);
 
-  ASSERT_TRUE(table->Equals(*result));
+  AssertTablesEqual(*table, *result);
 }
 
 TEST(TestArrowReadWrite, CoerceTimestamps) {
@@ -1097,13 +1145,14 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
   DoSimpleRoundtrip(
       input, 1, input->num_rows(), {}, &milli_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build());
-  ASSERT_TRUE(milli_result->Equals(*ex_milli_result));
+
+  AssertTablesEqual(*ex_milli_result, *milli_result);
 
   std::shared_ptr<Table> micro_result;
   DoSimpleRoundtrip(
       input, 1, input->num_rows(), {}, &micro_result,
       ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
-  ASSERT_TRUE(micro_result->Equals(*ex_micro_result));
+  AssertTablesEqual(*ex_micro_result, *micro_result);
 }
 
 TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
@@ -1213,7 +1262,7 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
   std::shared_ptr<Table> result;
   DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
 
-  ASSERT_TRUE(result->Equals(*ex_table));
+  AssertTablesEqual(*ex_table, *result);
 }
 
 void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
@@ -1253,7 +1302,7 @@ TEST(TestArrowReadWrite, MultithreadedRead) {
   std::shared_ptr<Table> result;
   DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result);
 
-  ASSERT_TRUE(table->Equals(*result));
+  AssertTablesEqual(*table, *result);
 }
 
 TEST(TestArrowReadWrite, ReadSingleRowGroup) {
@@ -1328,7 +1377,92 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
 
   auto ex_schema = std::make_shared<::arrow::Schema>(ex_fields);
   Table expected(ex_schema, ex_columns);
-  ASSERT_TRUE(result->Equals(expected));
+  AssertTablesEqual(expected, *result);
+}
+
+void MakeListTable(int num_rows, std::shared_ptr<Table>* out) {
+  ::arrow::Int32Builder offset_builder;
+
+  std::vector<int32_t> length_draws;
+  randint(num_rows, 0, 100, &length_draws);
+
+  std::vector<int32_t> offset_values;
+
+  // Make sure some of them are length 0
+  int32_t total_elements = 0;
+  for (size_t i = 0; i < length_draws.size(); ++i) {
+    if (length_draws[i] < 10) {
+      length_draws[i] = 0;
+    }
+    offset_values.push_back(total_elements);
+    total_elements += length_draws[i];
+  }
+  offset_values.push_back(total_elements);
+
+  std::vector<int8_t> value_draws;
+  randint<int8_t>(total_elements, 0, 100, &value_draws);
+
+  std::vector<bool> is_valid;
+  random_is_valid(total_elements, 0.1, &is_valid);
+
+  std::shared_ptr<Array> values, offsets;
+  ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid,
+                                                      value_draws, &values);
+  ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets);
+
+  std::shared_ptr<Array> list_array;
+  ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(),
+                                           &list_array));
+
+  auto f1 = ::arrow::field("a", ::arrow::list(::arrow::int8()));
+  auto schema = ::arrow::schema({f1});
+  std::vector<std::shared_ptr<Array>> arrays = {list_array};
+  *out = std::make_shared<Table>(schema, arrays);
+}
+
+TEST(TestArrowReadWrite, ListLargeRecords) {
+  const int num_rows = 50;
+
+  std::shared_ptr<Table> table;
+  MakeListTable(num_rows, &table);
+
+  std::shared_ptr<Buffer> buffer;
+  WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer);
+
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+                              ::arrow::default_memory_pool(),
+                              ::parquet::default_reader_properties(), nullptr, &reader));
+
+  // Read everything
+  std::shared_ptr<Table> result;
+  ASSERT_OK_NO_THROW(reader->ReadTable(&result));
+  AssertTablesEqual(*table, *result);
+
+  ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+                              ::arrow::default_memory_pool(),
+                              ::parquet::default_reader_properties(), nullptr, &reader));
+
+  std::unique_ptr<ColumnReader> col_reader;
+  ASSERT_OK(reader->GetColumn(0, &col_reader));
+
+  auto expected = table->column(0)->data()->chunk(0);
+
+  std::vector<std::shared_ptr<Array>> pieces;
+  for (int i = 0; i < num_rows; ++i) {
+    std::shared_ptr<Array> piece;
+    ASSERT_OK(col_reader->NextBatch(1, &piece));
+    ASSERT_EQ(1, piece->length());
+    pieces.push_back(piece);
+  }
+  auto chunked = std::make_shared<::arrow::ChunkedArray>(pieces);
+
+  auto chunked_col =
+      std::make_shared<::arrow::Column>(table->schema()->field(0), chunked);
+  std::vector<std::shared_ptr<::arrow::Column>> columns = {chunked_col};
+  auto chunked_table = std::make_shared<Table>(table->schema(), columns);
+
+  ASSERT_TRUE(table->Equals(*chunked_table));
 }
 
 TEST(TestArrowWrite, CheckChunkSize) {
@@ -1359,6 +1493,7 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
 
   void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
     nested_parquet_ = std::make_shared<InMemoryOutputStream>();
+
     writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, schema,
                                                default_writer_properties());
     row_group_writer_ = writer_->AppendRowGroup(num_rows);
@@ -1397,7 +1532,6 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
 
   void ValidateColumnArray(const ::arrow::Int32Array& array, size_t expected_nulls) {
     ValidateArray(array, expected_nulls);
-
     int j = 0;
     for (int i = 0; i < values_array_->length(); i++) {
       if (array.IsNull(i)) {
@@ -1515,15 +1649,19 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
 
     int num_columns = num_trees * static_cast<int>((std::pow(num_children, tree_depth)));
 
-    std::vector<int16_t> def_levels(num_rows);
-    std::vector<int16_t> rep_levels(num_rows);
-    for (int i = 0; i < num_rows; i++) {
+    std::vector<int16_t> def_levels;
+    std::vector<int16_t> rep_levels;
+
+    int num_levels = 0;
+    while (num_levels < num_rows) {
       if (node_repetition == Repetition::REQUIRED) {
-        def_levels[i] = 0;  // all is required
+        def_levels.push_back(0);  // all are required
       } else {
-        def_levels[i] = i % tree_depth;  // all is optional
+        int16_t level = static_cast<int16_t>(num_levels % (tree_depth + 2));
+        def_levels.push_back(level);  // all are optional
       }
-      rep_levels[i] = 0;  // none is repeated
+      rep_levels.push_back(0);  // none is repeated
+      ++num_levels;
     }
 
     // Produce values for the columns
@@ -1675,7 +1813,7 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
   const int num_trees = 10;
   const int depth = 5;
   const int num_children = 3;
-  int num_rows = SMALL_SIZE * depth;
+  int num_rows = SMALL_SIZE * (depth + 2);
   CreateMultiLevelNestedParquet(num_trees, depth, num_children, num_rows, GetParam());
   std::shared_ptr<Table> table;
   ASSERT_OK_NO_THROW(reader_->ReadTable(&table));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 8d5ea7e..5edc837 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -24,13 +24,18 @@
 #include <queue>
 #include <string>
 #include <thread>
+#include <type_traits>
 #include <vector>
 
 #include "arrow/api.h"
 #include "arrow/util/bit-util.h"
 #include "arrow/util/logging.h"
+#include "arrow/util/parallel.h"
 
+#include "parquet/arrow/record_reader.h"
 #include "parquet/arrow/schema.h"
+#include "parquet/column_reader.h"
+#include "parquet/schema.h"
 #include "parquet/util/schema-util.h"
 
 using arrow::Array;
@@ -40,21 +45,28 @@ using arrow::Field;
 using arrow::Int32Array;
 using arrow::ListArray;
 using arrow::StructArray;
+using arrow::TimestampArray;
 using arrow::MemoryPool;
 using arrow::PoolBuffer;
 using arrow::Status;
 using arrow::Table;
 
-using parquet::schema::NodePtr;
+using parquet::schema::Node;
 
 // Help reduce verbosity
 using ParquetReader = parquet::ParquetFileReader;
+using arrow::ParallelFor;
+
+using parquet::internal::RecordReader;
 
 namespace parquet {
 namespace arrow {
 
+using ::arrow::BitUtil::BytesForBits;
+
 constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
-constexpr int64_t kNanosecondsInADay = 86400LL * 1000LL * 1000LL * 1000LL;
+constexpr int64_t kMillisecondsInADay = 86400000LL;
+constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL;
 
 static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
   int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
@@ -66,47 +78,6 @@ template <typename ArrowType>
 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
 
 // ----------------------------------------------------------------------
-// Helper for parallel for-loop
-
-template <class FUNCTION>
-Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
-  std::vector<std::thread> thread_pool;
-  thread_pool.reserve(nthreads);
-  std::atomic<int> task_counter(0);
-
-  std::mutex error_mtx;
-  bool error_occurred = false;
-  Status error;
-
-  for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
-    thread_pool.emplace_back(
-        [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
-          int task_id;
-          while (!error_occurred) {
-            task_id = task_counter.fetch_add(1);
-            if (task_id >= num_tasks) {
-              break;
-            }
-            Status s = func(task_id);
-            if (!s.ok()) {
-              std::lock_guard<std::mutex> lock(error_mtx);
-              error_occurred = true;
-              error = s;
-              break;
-            }
-          }
-        });
-  }
-  for (auto&& thread : thread_pool) {
-    thread.join();
-  }
-  if (error_occurred) {
-    return error;
-  }
-  return Status::OK();
-}
-
-// ----------------------------------------------------------------------
 // Iteration utilities
 
 // Abstraction to decouple row group iteration details from the ColumnReader,
@@ -120,7 +91,7 @@ class FileColumnIterator {
 
   virtual ~FileColumnIterator() {}
 
-  virtual std::shared_ptr<::parquet::ColumnReader> Next() = 0;
+  virtual std::unique_ptr<::parquet::PageReader> NextChunk() = 0;
 
   const SchemaDescriptor* schema() const { return schema_; }
 
@@ -141,10 +112,10 @@ class AllRowGroupsIterator : public FileColumnIterator {
   explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
       : FileColumnIterator(column_index, reader), next_row_group_(0) {}
 
-  std::shared_ptr<::parquet::ColumnReader> Next() override {
-    std::shared_ptr<::parquet::ColumnReader> result;
+  std::unique_ptr<::parquet::PageReader> NextChunk() override {
+    std::unique_ptr<::parquet::PageReader> result;
     if (next_row_group_ < reader_->metadata()->num_row_groups()) {
-      result = reader_->RowGroup(next_row_group_)->Column(column_index_);
+      result = reader_->RowGroup(next_row_group_)->GetColumnPageReader(column_index_);
       next_row_group_++;
     } else {
       result = nullptr;
@@ -164,12 +135,13 @@ class SingleRowGroupIterator : public FileColumnIterator {
         row_group_number_(row_group_number),
         done_(false) {}
 
-  std::shared_ptr<::parquet::ColumnReader> Next() override {
+  std::unique_ptr<::parquet::PageReader> NextChunk() override {
     if (done_) {
       return nullptr;
     }
 
-    auto result = reader_->RowGroup(row_group_number_)->Column(column_index_);
+    auto result =
+        reader_->RowGroup(row_group_number_)->GetColumnPageReader(column_index_);
     done_ = true;
     return result;
   };
@@ -193,8 +165,9 @@ class FileReader::Impl {
   Status ReadSchemaField(int i, std::shared_ptr<Array>* out);
   Status ReadSchemaField(int i, const std::vector<int>& indices,
                          std::shared_ptr<Array>* out);
-  Status GetReaderForNode(int index, const NodePtr& node, const std::vector<int>& indices,
-                          int16_t def_level, std::unique_ptr<ColumnReader::Impl>* out);
+  Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
+                          int16_t def_level,
+                          std::unique_ptr<ColumnReader::ColumnReaderImpl>* out);
   Status ReadColumn(int i, std::shared_ptr<Array>* out);
   Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
   Status GetSchema(const std::vector<int>& indices,
@@ -223,96 +196,54 @@ class FileReader::Impl {
   int num_threads_;
 };
 
-typedef const int16_t* ValueLevelsPtr;
-
-class ColumnReader::Impl {
+class ColumnReader::ColumnReaderImpl {
  public:
-  virtual ~Impl() {}
-  virtual Status NextBatch(int batch_size, std::shared_ptr<Array>* out) = 0;
-  virtual Status GetDefLevels(ValueLevelsPtr* data, size_t* length) = 0;
-  virtual Status GetRepLevels(ValueLevelsPtr* data, size_t* length) = 0;
+  virtual ~ColumnReaderImpl() {}
+  virtual Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) = 0;
+  virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0;
+  virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0;
   virtual const std::shared_ptr<Field> field() = 0;
 };
 
 // Reader implementation for primitive arrays
-class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::Impl {
+class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
  public:
   PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
-      : pool_(pool),
-        input_(std::move(input)),
-        descr_(input_->descr()),
-        values_buffer_(pool),
-        def_levels_buffer_(pool),
-        rep_levels_buffer_(pool) {
-    DCHECK(NodeToField(input_->descr()->schema_node(), &field_).ok());
+      : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
+    record_reader_ = RecordReader::Make(descr_, pool_);
+    DCHECK(NodeToField(*input_->descr()->schema_node(), &field_).ok());
     NextRowGroup();
   }
 
   virtual ~PrimitiveImpl() {}
 
-  Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
-
-  template <typename ArrowType, typename ParquetType>
-  Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
-
-  template <typename ArrowType>
-  Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out);
+  Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
 
-  template <typename ArrowType>
-  Status ReadFLBABatch(int batch_size, int byte_width, std::shared_ptr<Array>* out);
+  template <typename ParquetType>
+  Status WrapIntoListArray(std::shared_ptr<Array>* array);
 
-  template <typename ArrowType>
-  Status InitDataBuffer(int batch_size);
-  Status InitValidBits(int batch_size);
-  template <typename ArrowType, typename ParquetType>
-  Status ReadNullableBatch(TypedColumnReader<ParquetType>* reader, int16_t* def_levels,
-                           int16_t* rep_levels, int64_t values_to_read,
-                           int64_t* levels_read, int64_t* values_read);
-  template <typename ArrowType, typename ParquetType>
-  Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
-                              int64_t values_to_read, int64_t* levels_read);
-  Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
-                           int64_t total_values_read, std::shared_ptr<Array>* array);
-
-  Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
-  Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+  Status GetDefLevels(const int16_t** data, size_t* length) override;
+  Status GetRepLevels(const int16_t** data, size_t* length) override;
 
   const std::shared_ptr<Field> field() override { return field_; }
 
  private:
   void NextRowGroup();
 
-  template <typename InType, typename OutType>
-  struct can_copy_ptr {
-    static constexpr bool value =
-        std::is_same<InType, OutType>::value ||
-        (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
-         (sizeof(InType) == sizeof(OutType)));
-  };
-
   MemoryPool* pool_;
   std::unique_ptr<FileColumnIterator> input_;
   const ColumnDescriptor* descr_;
 
-  std::shared_ptr<::parquet::ColumnReader> column_reader_;
-  std::shared_ptr<Field> field_;
+  std::shared_ptr<RecordReader> record_reader_;
 
-  PoolBuffer values_buffer_;
-  PoolBuffer def_levels_buffer_;
-  PoolBuffer rep_levels_buffer_;
-  std::shared_ptr<PoolBuffer> data_buffer_;
-  uint8_t* data_buffer_ptr_;
-  std::shared_ptr<PoolBuffer> valid_bits_buffer_;
-  uint8_t* valid_bits_ptr_;
-  int64_t valid_bits_idx_;
-  int64_t null_count_;
+  std::shared_ptr<Field> field_;
 };
 
 // Reader implementation for struct array
-class PARQUET_NO_EXPORT StructImpl : public ColumnReader::Impl {
+class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl {
  public:
-  explicit StructImpl(const std::vector<std::shared_ptr<Impl>>& children,
-                      int16_t struct_def_level, MemoryPool* pool, const NodePtr& node)
+  explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
+                      int16_t struct_def_level, MemoryPool* pool, const Node* node)
       : children_(children),
         struct_def_level_(struct_def_level),
         pool_(pool),
@@ -322,21 +253,21 @@ class PARQUET_NO_EXPORT StructImpl : public ColumnReader::Impl {
 
   virtual ~StructImpl() {}
 
-  Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
-  Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
-  Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+  Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
+  Status GetDefLevels(const int16_t** data, size_t* length) override;
+  Status GetRepLevels(const int16_t** data, size_t* length) override;
   const std::shared_ptr<Field> field() override { return field_; }
 
  private:
-  std::vector<std::shared_ptr<Impl>> children_;
+  std::vector<std::shared_ptr<ColumnReaderImpl>> children_;
   int16_t struct_def_level_;
   MemoryPool* pool_;
   std::shared_ptr<Field> field_;
   PoolBuffer def_levels_buffer_;
 
-  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap,
-                              int64_t* null_count);
-  void InitField(const NodePtr& node, const std::vector<std::shared_ptr<Impl>>& children);
+  Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
+  void InitField(const Node* node,
+                 const std::vector<std::shared_ptr<ColumnReaderImpl>>& children);
 };
 
 FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
@@ -347,26 +278,26 @@ FileReader::~FileReader() {}
 Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
   std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
 
-  std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
+  std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+      new PrimitiveImpl(pool_, std::move(input)));
   *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
   return Status::OK();
 }
 
-Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
-                                          const std::vector<int>& indices,
-                                          int16_t def_level,
-                                          std::unique_ptr<ColumnReader::Impl>* out) {
+Status FileReader::Impl::GetReaderForNode(
+    int index, const Node* node, const std::vector<int>& indices, int16_t def_level,
+    std::unique_ptr<ColumnReader::ColumnReaderImpl>* out) {
   *out = nullptr;
 
   if (IsSimpleStruct(node)) {
-    const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node.get());
-    std::vector<std::shared_ptr<ColumnReader::Impl>> children;
+    const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node);
+    std::vector<std::shared_ptr<ColumnReader::ColumnReaderImpl>> children;
     for (int i = 0; i < group->field_count(); i++) {
-      std::unique_ptr<ColumnReader::Impl> child_reader;
+      std::unique_ptr<ColumnReader::ColumnReaderImpl> child_reader;
       // TODO(itaiin): Remove the -1 index hack when all types of nested reads
       // are supported. This currently just signals the lower level reader resolution
       // to abort
-      RETURN_NOT_OK(GetReaderForNode(index, group->field(i), indices, def_level + 1,
+      RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices, def_level + 1,
                                      &child_reader));
       if (child_reader != nullptr) {
         children.push_back(std::move(child_reader));
@@ -374,22 +305,22 @@ Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
     }
 
     if (children.size() > 0) {
-      *out = std::unique_ptr<ColumnReader::Impl>(
+      *out = std::unique_ptr<ColumnReader::ColumnReaderImpl>(
           new StructImpl(children, def_level, pool_, node));
     }
   } else {
     // This should be a flat field case - translate the field index to
     // the correct column index by walking down to the leaf node
-    NodePtr walker = node;
+    const Node* walker = node;
     while (!walker->is_primitive()) {
       DCHECK(walker->is_group());
-      auto group = static_cast<GroupNode*>(walker.get());
+      auto group = static_cast<const GroupNode*>(walker);
       if (group->field_count() != 1) {
         return Status::NotImplemented("lists with structs are not supported.");
       }
-      walker = group->field(0);
+      walker = group->field(0).get();
     }
-    auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker.get());
+    auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker);
 
     // If the index of the column is found then a reader for the coliumn is needed.
     // Otherwise *out keeps the nullptr value.
@@ -417,8 +348,8 @@ Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
                                          std::shared_ptr<Array>* out) {
   auto parquet_schema = reader_->metadata()->schema();
 
-  auto node = parquet_schema->group_node()->field(i);
-  std::unique_ptr<ColumnReader::Impl> reader_impl;
+  auto node = parquet_schema->group_node()->field(i).get();
+  std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
 
   RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl));
   if (reader_impl == nullptr) {
@@ -428,24 +359,28 @@ Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
 
   std::unique_ptr<ColumnReader> reader(new ColumnReader(std::move(reader_impl)));
 
-  int64_t batch_size = 0;
-  for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
-    batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+  // TODO(wesm): This calculation doesn't make much sense when we have repeated
+  // schema nodes
+  int64_t records_to_read = 0;
+
+  const FileMetaData& metadata = *reader_->metadata();
+  for (int j = 0; j < metadata.num_row_groups(); j++) {
+    records_to_read += metadata.RowGroup(j)->ColumnChunk(i)->num_values();
   }
 
-  return reader->NextBatch(static_cast<int>(batch_size), out);
+  return reader->NextBatch(records_to_read, out);
 }
 
 Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
   std::unique_ptr<ColumnReader> flat_column_reader;
   RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
 
-  int64_t batch_size = 0;
+  int64_t records_to_read = 0;
   for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
-    batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+    records_to_read += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
   }
 
-  return flat_column_reader->NextBatch(static_cast<int>(batch_size), out);
+  return flat_column_reader->NextBatch(records_to_read, out);
 }
 
 Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
@@ -472,16 +407,17 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
   auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
                          this](int i) {
     int column_index = indices[i];
-    int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values();
+    int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
 
     std::unique_ptr<FileColumnIterator> input(
         new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
 
-    std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
+    std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+        new PrimitiveImpl(pool_, std::move(input)));
     ColumnReader flat_column_reader(std::move(impl));
 
     std::shared_ptr<Array> array;
-    RETURN_NOT_OK(flat_column_reader.NextBatch(static_cast<int>(batch_size), &array));
+    RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
     columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
   };
@@ -642,282 +578,12 @@ const ParquetFileReader* FileReader::parquet_reader() const {
   return impl_->parquet_reader();
 }
 
-template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
-                                           int64_t values_to_read, int64_t* levels_read) {
-  using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
-
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
-  auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
-  int64_t values_read;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
-                                             nullptr, values, &values_read));
-
-  ArrowCType* out_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
-  std::copy(values, values + values_read, out_ptr + valid_bits_idx_);
-  valid_bits_idx_ += values_read;
-
-  return Status::OK();
-}
-
-#define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)               \
-  template <>                                                                    \
-  Status PrimitiveImpl::ReadNonNullableBatch<ArrowType, ParquetType>(            \
-      TypedColumnReader<ParquetType> * reader, int64_t values_to_read,           \
-      int64_t * levels_read) {                                                   \
-    int64_t values_read;                                                         \
-    CType* out_ptr = reinterpret_cast<CType*>(data_buffer_ptr_);                 \
-    PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(                       \
-                             static_cast<int>(values_to_read), nullptr, nullptr, \
-                             out_ptr + valid_bits_idx_, &values_read));          \
-                                                                                 \
-    valid_bits_idx_ += values_read;                                              \
-                                                                                 \
-    return Status::OK();                                                         \
-  }
-
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
-    TypedColumnReader<Int96Type>* reader, int64_t values_to_read, int64_t* levels_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
-  auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
-  int64_t values_read;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
-                                             nullptr, values, &values_read));
-
-  int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
-  for (int64_t i = 0; i < values_read; i++) {
-    *out_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
-  }
-  valid_bits_idx_ += values_read;
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
-    TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
-  auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
-  int64_t values_read;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
-                                             nullptr, values, &values_read));
-
-  int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
-  for (int64_t i = 0; i < values_read; i++) {
-    *out_ptr++ = static_cast<int64_t>(values[i]) * 86400000;
-  }
-  valid_bits_idx_ += values_read;
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
-    TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
-    int64_t* levels_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
-  auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
-  int64_t values_read;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
-                                             nullptr, values, &values_read));
-
-  for (int64_t i = 0; i < values_read; i++) {
-    if (values[i]) {
-      ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_);
-    }
-    valid_bits_idx_++;
-  }
-
-  return Status::OK();
-}
-
-template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
-                                        int16_t* def_levels, int16_t* rep_levels,
-                                        int64_t values_to_read, int64_t* levels_read,
-                                        int64_t* values_read) {
-  using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
-
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
-  auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
-  int64_t null_count;
-  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
-      static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
-      valid_bits_idx_, levels_read, values_read, &null_count));
-
-  auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
-  INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-
-  for (int64_t i = 0; i < *values_read; i++) {
-    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
-      data_ptr[valid_bits_idx_ + i] = values[i];
-    }
-    READ_NEXT_BITSET(valid_bits_ptr_);
-  }
-  null_count_ += null_count;
-  valid_bits_idx_ += *values_read;
-
-  return Status::OK();
-}
-
-#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)                    \
-  template <>                                                                      \
-  Status PrimitiveImpl::ReadNullableBatch<ArrowType, ParquetType>(                 \
-      TypedColumnReader<ParquetType> * reader, int16_t * def_levels,               \
-      int16_t * rep_levels, int64_t values_to_read, int64_t * levels_read,         \
-      int64_t * values_read) {                                                     \
-    auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_);                    \
-    int64_t null_count;                                                            \
-    PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(                                  \
-        static_cast<int>(values_to_read), def_levels, rep_levels,                  \
-        data_ptr + valid_bits_idx_, valid_bits_ptr_, valid_bits_idx_, levels_read, \
-        values_read, &null_count));                                                \
-                                                                                   \
-    valid_bits_idx_ += *values_read;                                               \
-    null_count_ += null_count;                                                     \
-                                                                                   \
-    return Status::OK();                                                           \
-  }
-
-NULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
-NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
-NULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
-    TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int16_t* rep_levels,
-    int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
-  auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
-  int64_t null_count;
-  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
-      static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
-      valid_bits_idx_, levels_read, values_read, &null_count));
-
-  auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
-  INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-  for (int64_t i = 0; i < *values_read; i++) {
-    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
-      data_ptr[valid_bits_idx_ + i] = impala_timestamp_to_nanoseconds(values[i]);
-    }
-    READ_NEXT_BITSET(valid_bits_ptr_);
-  }
-  null_count_ += null_count;
-  valid_bits_idx_ += *values_read;
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
-    TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels,
-    int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
-  auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
-  int64_t null_count;
-  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
-      static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
-      valid_bits_idx_, levels_read, values_read, &null_count));
-
-  auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
-  INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-  for (int64_t i = 0; i < *values_read; i++) {
-    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
-      data_ptr[valid_bits_idx_ + i] = static_cast<int64_t>(values[i]) * 86400000;
-    }
-    READ_NEXT_BITSET(valid_bits_ptr_);
-  }
-  null_count_ += null_count;
-  valid_bits_idx_ += *values_read;
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
-    TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
-    int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
-  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
-  auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
-  int64_t null_count;
-  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
-      static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
-      valid_bits_idx_, levels_read, values_read, &null_count));
-
-  INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-  for (int64_t i = 0; i < *values_read; i++) {
-    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
-      if (values[i]) {
-        ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_ + i);
-      }
-    }
-    READ_NEXT_BITSET(valid_bits_ptr_);
-  }
-  valid_bits_idx_ += *values_read;
-  null_count_ += null_count;
-
-  return Status::OK();
-}
-
-template <typename ArrowType>
-Status PrimitiveImpl::InitDataBuffer(int batch_size) {
-  using ArrowCType = typename ArrowType::c_type;
-  data_buffer_ = std::make_shared<PoolBuffer>(pool_);
-  RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType), false));
-  data_buffer_ptr_ = data_buffer_->mutable_data();
-
-  return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
-  data_buffer_ = std::make_shared<PoolBuffer>(pool_);
-  RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8, false));
-  data_buffer_ptr_ = data_buffer_->mutable_data();
-  memset(data_buffer_ptr_, 0, data_buffer_->size());
-
-  return Status::OK();
-}
-
-Status PrimitiveImpl::InitValidBits(int batch_size) {
-  valid_bits_idx_ = 0;
-  if (descr_->max_definition_level() > 0) {
-    int valid_bits_size =
-        static_cast<int>(::arrow::BitUtil::CeilByte(batch_size + 1)) / 8;
-    valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
-    RETURN_NOT_OK(valid_bits_buffer_->Resize(valid_bits_size, false));
-    valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
-    memset(valid_bits_ptr_, 0, valid_bits_size);
-    null_count_ = 0;
-  }
-  return Status::OK();
-}
+template <typename ParquetType>
+Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<Array>* array) {
+  const int16_t* def_levels = record_reader_->def_levels();
+  const int16_t* rep_levels = record_reader_->rep_levels();
+  const int64_t total_levels_read = record_reader_->levels_position();
 
-Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
-                                        const int16_t* rep_levels,
-                                        int64_t total_levels_read,
-                                        std::shared_ptr<Array>* array) {
   std::shared_ptr<::arrow::Schema> arrow_schema;
   RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
                                   input_->metadata()->key_value_metadata(),
@@ -1021,8 +687,8 @@ Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
 
     std::shared_ptr<Array> output(*array);
     for (int64_t j = list_depth - 1; j >= 0; j--) {
-      auto list_type = std::make_shared<::arrow::ListType>(
-          std::make_shared<Field>("item", output->type(), nullable[j + 1]));
+      auto list_type =
+          ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
       output = std::make_shared<::arrow::ListArray>(
           list_type, list_lengths[j], offsets[j], output, valid_bits[j], null_counts[j]);
     }
@@ -1032,346 +698,291 @@ Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
 }
 
 template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out) {
+struct supports_fast_path_impl {
   using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+  static constexpr bool value = std::is_same<ArrowCType, ParquetCType>::value;
+};
 
-  int values_to_read = batch_size;
-  int total_levels_read = 0;
-  RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size));
-  RETURN_NOT_OK(InitValidBits(batch_size));
-  if (descr_->max_definition_level() > 0) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  if (descr_->max_repetition_level() > 0) {
-    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
-  while ((values_to_read > 0) && column_reader_) {
-    auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
-    int64_t values_read;
-    int64_t levels_read;
-    if (descr_->max_definition_level() == 0) {
-      RETURN_NOT_OK((ReadNonNullableBatch<ArrowType, ParquetType>(reader, values_to_read,
-                                                                  &values_read)));
+template <typename ArrowType>
+struct supports_fast_path_impl<ArrowType, ByteArrayType> {
+  static constexpr bool value = false;
+};
+
+template <typename ArrowType>
+struct supports_fast_path_impl<ArrowType, FLBAType> {
+  static constexpr bool value = false;
+};
+
+template <typename ArrowType, typename ParquetType>
+using supports_fast_path =
+    typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
+
+template <typename ArrowType, typename ParquetType, typename Enable = void>
+struct TransferFunctor {
+  using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
+
+    auto values = reinterpret_cast<const ParquetCType*>(reader->values());
+    auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
+    std::copy(values, values + length, out_ptr);
+
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      *out = std::make_shared<ArrayType<ArrowType>>(type, length, data, is_valid,
+                                                    reader->null_count());
     } else {
-      // As per the defintion and checks for flat (list) columns:
-      // descr_->max_definition_level() > 0, <= 3
-      RETURN_NOT_OK((ReadNullableBatch<ArrowType, ParquetType>(
-          reader, def_levels + total_levels_read, rep_levels + total_levels_read,
-          values_to_read, &levels_read, &values_read)));
-      total_levels_read += static_cast<int>(levels_read);
-    }
-    values_to_read -= static_cast<int>(values_read);
-    if (!column_reader_->HasNext()) {
-      NextRowGroup();
+      *out = std::make_shared<ArrayType<ArrowType>>(type, length, data);
     }
+    return Status::OK();
   }
+};
 
-  // Shrink arrays as they may be larger than the output.
-  RETURN_NOT_OK(data_buffer_->Resize(valid_bits_idx_ * sizeof(ArrowCType)));
-  if (descr_->max_definition_level() > 0) {
-    if (valid_bits_idx_ < batch_size * 0.8) {
-      RETURN_NOT_OK(valid_bits_buffer_->Resize(
-          ::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8, false));
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<ArrowType, ParquetType,
+                       supports_fast_path<ArrowType, ParquetType>> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    std::shared_ptr<PoolBuffer> values = reader->ReleaseValues();
+
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      *out = std::make_shared<ArrayType<ArrowType>>(type, length, values, is_valid,
+                                                    reader->null_count());
+    } else {
+      *out = std::make_shared<ArrayType<ArrowType>>(type, length, values);
     }
-    *out = std::make_shared<ArrayType<ArrowType>>(
-        field_->type(), valid_bits_idx_, data_buffer_, valid_bits_buffer_, null_count_);
-    // Relase the ownership as the Buffer is now part of a new Array
-    valid_bits_buffer_.reset();
-  } else {
-    *out = std::make_shared<ArrayType<ArrowType>>(field_->type(), valid_bits_idx_,
-                                                  data_buffer_);
+    return Status::OK();
   }
-  // Relase the ownership as the Buffer is now part of a new Array
-  data_buffer_.reset();
-
-  // Check if we should transform this array into an list array.
-  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+};
 
 template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
-    int batch_size, std::shared_ptr<Array>* out) {
-  int values_to_read = batch_size;
-  int total_levels_read = 0;
-  RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size));
-  RETURN_NOT_OK(InitValidBits(batch_size));
-  if (descr_->max_definition_level() > 0) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  if (descr_->max_repetition_level() > 0) {
-    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
-  while ((values_to_read > 0) && column_reader_) {
-    auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
-    int64_t values_read;
-    int64_t levels_read;
-    int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-    if (descr_->max_definition_level() == 0) {
-      RETURN_NOT_OK((ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
-          reader, values_to_read, &values_read)));
-    } else {
-      // As per the defintion and checks for flat columns:
-      // descr_->max_definition_level() == 1
-      RETURN_NOT_OK((ReadNullableBatch<::arrow::BooleanType, BooleanType>(
-          reader, def_levels + total_levels_read, rep_levels + total_levels_read,
-          values_to_read, &levels_read, &values_read)));
-      total_levels_read += static_cast<int>(levels_read);
-    }
-    values_to_read -= static_cast<int>(values_read);
-    if (!column_reader_->HasNext()) {
-      NextRowGroup();
+struct TransferFunctor<::arrow::BooleanType, BooleanType> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    std::shared_ptr<Buffer> data;
+
+    const int64_t buffer_size = BytesForBits(length);
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
+
+    // Transfer boolean values to packed bitmap
+    auto values = reinterpret_cast<const bool*>(reader->values());
+    uint8_t* data_ptr = data->mutable_data();
+    memset(data_ptr, 0, buffer_size);
+
+    for (int64_t i = 0; i < length; i++) {
+      if (values[i]) {
+        ::arrow::BitUtil::SetBit(data_ptr, i);
+      }
     }
-  }
 
-  if (descr_->max_definition_level() > 0) {
-    // TODO: Shrink arrays in the case they are too large
-    if (valid_bits_idx_ < batch_size * 0.8) {
-      // Shrink arrays as they are larger than the output.
-      // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
-      //    without the need for a copy. Given a decent underlying allocator this
-      //    should still free some underlying pages to the OS.
-
-      auto data_buffer = std::make_shared<PoolBuffer>(pool_);
-      RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool)));
-      memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
-      data_buffer_ = data_buffer;
-
-      auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
-      RETURN_NOT_OK(
-          valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
-      memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
-             valid_bits_buffer->size());
-      valid_bits_buffer_ = valid_bits_buffer;
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      RETURN_NOT_OK(is_valid->Resize(BytesForBits(length), false));
+      *out = std::make_shared<BooleanArray>(type, length, data, is_valid,
+                                            reader->null_count());
+    } else {
+      *out = std::make_shared<BooleanArray>(type, length, data);
     }
-    *out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_,
-                                          valid_bits_buffer_, null_count_);
-    // Relase the ownership
-    data_buffer_.reset();
-    valid_bits_buffer_.reset();
-  } else {
-    *out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_);
-    data_buffer_.reset();
+    return Status::OK();
   }
+};
 
-  // Check if we should transform this array into an list array.
-  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
-
-template <typename ArrowType>
-Status PrimitiveImpl::ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out) {
-  using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+template <>
+struct TransferFunctor<::arrow::TimestampType, Int96Type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    auto values = reinterpret_cast<const Int96*>(reader->values());
+
+    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
+
+    auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+    for (int64_t i = 0; i < length; i++) {
+      *data_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
+    }
 
-  int total_levels_read = 0;
-  if (descr_->max_definition_level() > 0) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  if (descr_->max_repetition_level() > 0) {
-    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
-  int values_to_read = batch_size;
-  BuilderType builder(pool_);
-  while ((values_to_read > 0) && column_reader_) {
-    RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ByteArray), false));
-    auto reader = dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get());
-    int64_t values_read;
-    int64_t levels_read;
-    auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
-    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
-                             values_to_read, def_levels + total_levels_read,
-                             rep_levels + total_levels_read, values, &values_read));
-    values_to_read -= static_cast<int>(levels_read);
-    if (descr_->max_definition_level() == 0) {
-      for (int64_t i = 0; i < levels_read; i++) {
-        RETURN_NOT_OK(
-            builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
-      }
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      *out = std::make_shared<TimestampArray>(type, length, data, is_valid,
+                                              reader->null_count());
     } else {
-      // descr_->max_definition_level() > 0
-      int values_idx = 0;
-      int nullable_elements = descr_->schema_node()->is_optional();
-      for (int64_t i = 0; i < levels_read; i++) {
-        if (nullable_elements &&
-            (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
-          RETURN_NOT_OK(builder.AppendNull());
-        } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
-          RETURN_NOT_OK(
-              builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
-                             values[values_idx].len));
-          values_idx++;
-        }
-      }
-      total_levels_read += static_cast<int>(levels_read);
-    }
-    if (!column_reader_->HasNext()) {
-      NextRowGroup();
+      *out = std::make_shared<TimestampArray>(type, length, data);
     }
+
+    return Status::OK();
   }
+};
 
-  RETURN_NOT_OK(builder.Finish(out));
-  // Check if we should transform this array into an list array.
-  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+template <>
+struct TransferFunctor<::arrow::Date64Type, Int32Type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    int64_t length = reader->values_written();
+    auto values = reinterpret_cast<const int32_t*>(reader->values());
+
+    std::shared_ptr<Buffer> data;
+    RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
+    auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+
+    for (int64_t i = 0; i < length; i++) {
+      *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsInADay;
+    }
 
-template <typename ArrowType>
-Status PrimitiveImpl::ReadFLBABatch(int batch_size, int byte_width,
-                                    std::shared_ptr<Array>* out) {
-  using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
-  int total_levels_read = 0;
-  if (descr_->max_definition_level() > 0) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  if (descr_->max_repetition_level() > 0) {
-    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
-  }
-  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
-  int values_to_read = batch_size;
-  BuilderType builder(::arrow::fixed_size_binary(byte_width), pool_);
-  while ((values_to_read > 0) && column_reader_) {
-    RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(FLBA), false));
-    auto reader = dynamic_cast<TypedColumnReader<FLBAType>*>(column_reader_.get());
-    int64_t values_read;
-    int64_t levels_read;
-    auto values = reinterpret_cast<FLBA*>(values_buffer_.mutable_data());
-    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
-                             values_to_read, def_levels + total_levels_read,
-                             rep_levels + total_levels_read, values, &values_read));
-    values_to_read -= static_cast<int>(levels_read);
-    if (descr_->max_definition_level() == 0) {
-      for (int64_t i = 0; i < levels_read; i++) {
-        RETURN_NOT_OK(builder.Append(values[i].ptr));
-      }
+    if (reader->nullable_values()) {
+      std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+      *out = std::make_shared<::arrow::Date64Array>(type, length, data, is_valid,
+                                                    reader->null_count());
     } else {
-      int values_idx = 0;
-      int nullable_elements = descr_->schema_node()->is_optional();
-      for (int64_t i = 0; i < levels_read; i++) {
-        if (nullable_elements &&
-            (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
-          RETURN_NOT_OK(builder.AppendNull());
-        } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
-          RETURN_NOT_OK(builder.Append(values[values_idx].ptr));
-          values_idx++;
-        }
-      }
-      total_levels_read += static_cast<int>(levels_read);
-    }
-    if (!column_reader_->HasNext()) {
-      NextRowGroup();
+      *out = std::make_shared<::arrow::Date64Array>(type, length, data);
     }
+    return Status::OK();
   }
+};
 
-  RETURN_NOT_OK(builder.Finish(out));
-  // Check if we should transform this array into an list array.
-  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
-
-template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
-    int batch_size, std::shared_ptr<Array>* out) {
-  return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out);
-}
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<
+    ArrowType, ParquetType,
+    typename std::enable_if<std::is_same<ParquetType, ByteArrayType>::value ||
+                            std::is_same<ParquetType, FLBAType>::value>::type> {
+  Status operator()(RecordReader* reader, MemoryPool* pool,
+                    const std::shared_ptr<::arrow::DataType>& type,
+                    std::shared_ptr<Array>* out) {
+    RETURN_NOT_OK(reader->builder()->Finish(out));
+
+    if (type->id() == ::arrow::Type::STRING) {
+      // Convert from BINARY type to STRING
+      auto new_data = (*out)->data()->ShallowCopy();
+      new_data->type = type;
+      RETURN_NOT_OK(::arrow::MakeArray(new_data, out));
+    }
+    return Status::OK();
+  }
+};
 
-template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
-    int batch_size, std::shared_ptr<Array>* out) {
-  return ReadByteArrayBatch<::arrow::StringType>(batch_size, out);
-}
+#define TRANSFER_DATA(ArrowType, ParquetType)                            \
+  TransferFunctor<ArrowType, ParquetType> func;                          \
+  RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \
+  RETURN_NOT_OK(WrapIntoListArray<ParquetType>(out))
 
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)              \
-  case ::arrow::Type::ENUM:                                         \
-    return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
-    break;
+#define TRANSFER_CASE(ENUM, ArrowType, ParquetType) \
+  case ::arrow::Type::ENUM: {                       \
+    TRANSFER_DATA(ArrowType, ParquetType);          \
+  } break;
 
-Status PrimitiveImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
-  if (!column_reader_) {
+Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+  if (!record_reader_->HasMoreData()) {
     // Exhausted all row groups.
     *out = nullptr;
     return Status::OK();
   }
 
-  switch (field_->type()->id()) {
-    case ::arrow::Type::NA:
-      *out = std::make_shared<::arrow::NullArray>(batch_size);
-      return Status::OK();
-      break;
-      TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
-      TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
-      TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
-      TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
-      TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
-      TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
-      TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
-      TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
-      TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
-      TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
-      TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
-      TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
-      TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
-      TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
-      TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
-    case ::arrow::Type::FIXED_SIZE_BINARY: {
-      int32_t byte_width =
-          static_cast<::arrow::FixedSizeBinaryType*>(field_->type().get())->byte_width();
-      return ReadFLBABatch<::arrow::FixedSizeBinaryType>(batch_size, byte_width, out);
-      break;
+  if (field_->type()->id() == ::arrow::Type::NA) {
+    *out = std::make_shared<::arrow::NullArray>(records_to_read);
+    return Status::OK();
+  }
+
+  try {
+    // Pre-allocation gives much better performance for flat columns
+    record_reader_->Reserve(records_to_read);
+
+    record_reader_->Reset();
+    while (records_to_read > 0) {
+      if (!record_reader_->HasMoreData()) {
+        break;
+      }
+      int64_t records_read = record_reader_->ReadRecords(records_to_read);
+      records_to_read -= records_read;
+      if (records_read == 0) {
+        NextRowGroup();
+      }
     }
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+
+  switch (field_->type()->id()) {
+    TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+    TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+    TRANSFER_CASE(INT8, ::arrow::Int8Type, Int32Type)
+    TRANSFER_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+    TRANSFER_CASE(INT16, ::arrow::Int16Type, Int32Type)
+    TRANSFER_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
+    TRANSFER_CASE(INT32, ::arrow::Int32Type, Int32Type)
+    TRANSFER_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+    TRANSFER_CASE(INT64, ::arrow::Int64Type, Int64Type)
+    TRANSFER_CASE(FLOAT, ::arrow::FloatType, FloatType)
+    TRANSFER_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+    TRANSFER_CASE(STRING, ::arrow::StringType, ByteArrayType)
+    TRANSFER_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
+    TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
+    TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
+    TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
     case ::arrow::Type::TIMESTAMP: {
       ::arrow::TimestampType* timestamp_type =
           static_cast<::arrow::TimestampType*>(field_->type().get());
       switch (timestamp_type->unit()) {
         case ::arrow::TimeUnit::MILLI:
-          return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
-          break;
-        case ::arrow::TimeUnit::MICRO:
-          return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
-          break;
-        case ::arrow::TimeUnit::NANO:
-          return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out);
-          break;
+        case ::arrow::TimeUnit::MICRO: {
+          TRANSFER_DATA(::arrow::TimestampType, Int64Type);
+        } break;
+        case ::arrow::TimeUnit::NANO: {
+          TRANSFER_DATA(::arrow::TimestampType, Int96Type);
+        } break;
         default:
           return Status::NotImplemented("TimeUnit not supported");
       }
       break;
     }
-      TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type)
-      TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type)
+      TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
+      TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
     default:
       std::stringstream ss;
       ss << "No support for reading columns of type " << field_->type()->ToString();
       return Status::NotImplemented(ss.str());
   }
+
+  return Status::OK();
 }
 
-void PrimitiveImpl::NextRowGroup() { column_reader_ = input_->Next(); }
+void PrimitiveImpl::NextRowGroup() {
+  std::unique_ptr<PageReader> page_reader = input_->NextChunk();
+  record_reader_->SetPageReader(std::move(page_reader));
+}
 
-Status PrimitiveImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
-  *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
-  *length = def_levels_buffer_.size() / sizeof(int16_t);
+Status PrimitiveImpl::GetDefLevels(const int16_t** data, size_t* length) {
+  *data = record_reader_->def_levels();
+  *length = record_reader_->levels_written();
   return Status::OK();
 }
 
-Status PrimitiveImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
-  *data = reinterpret_cast<ValueLevelsPtr>(rep_levels_buffer_.data());
-  *length = rep_levels_buffer_.size() / sizeof(int16_t);
+Status PrimitiveImpl::GetRepLevels(const int16_t** data, size_t* length) {
+  *data = record_reader_->rep_levels();
+  *length = record_reader_->levels_written();
   return Status::OK();
 }
 
-ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+ColumnReader::ColumnReader(std::unique_ptr<ColumnReaderImpl> impl)
+    : impl_(std::move(impl)) {}
 
 ColumnReader::~ColumnReader() {}
 
-Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
-  return impl_->NextBatch(batch_size, out);
+Status ColumnReader::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+  return impl_->NextBatch(records_to_read, out);
 }
 
 // StructImpl methods
@@ -1380,7 +991,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out
                                         int64_t* null_count_out) {
   std::shared_ptr<Buffer> null_bitmap;
   auto null_count = 0;
-  ValueLevelsPtr def_levels_data;
+  const int16_t* def_levels_data;
   size_t def_levels_length;
   RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
   RETURN_NOT_OK(GetEmptyBitmap(pool_, def_levels_length, &null_bitmap));
@@ -1402,7 +1013,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out
 
 // TODO(itaiin): Consider caching the results of this calculation -
 //   note that this is only used once for each read for now
-Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
+Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
   *data = nullptr;
   if (children_.size() == 0) {
     // Empty struct
@@ -1411,7 +1022,7 @@ Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
   }
 
   // We have at least one child
-  ValueLevelsPtr child_def_levels;
+  const int16_t* child_def_levels;
   size_t child_length;
   RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
   auto size = child_length * sizeof(int16_t);
@@ -1438,27 +1049,27 @@ Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
           std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
     }
   }
-  *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
+  *data = reinterpret_cast<const int16_t*>(def_levels_buffer_.data());
   *length = child_length;
   return Status::OK();
 }
 
-void StructImpl::InitField(const NodePtr& node,
-                           const std::vector<std::shared_ptr<Impl>>& children) {
+void StructImpl::InitField(
+    const Node* node, const std::vector<std::shared_ptr<ColumnReaderImpl>>& children) {
   // Make a shallow node to field conversion from the children fields
   std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
   for (size_t i = 0; i < children.size(); i++) {
     fields[i] = children[i]->field();
   }
-  auto type = std::make_shared<::arrow::StructType>(fields);
-  field_ = std::make_shared<Field>(node->name(), type);
+  auto type = ::arrow::struct_(fields);
+  field_ = ::arrow::field(node->name(), type);
 }
 
-Status StructImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
+Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) {
   return Status::NotImplemented("GetRepLevels is not implemented for struct");
 }
 
-Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status StructImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
   std::vector<std::shared_ptr<Array>> children_arrays;
   std::shared_ptr<Buffer> null_bitmap;
   int64_t null_count;
@@ -1467,16 +1078,23 @@ Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
   for (auto& child : children_) {
     std::shared_ptr<Array> child_array;
 
-    RETURN_NOT_OK(child->NextBatch(batch_size, &child_array));
-
+    RETURN_NOT_OK(child->NextBatch(records_to_read, &child_array));
     children_arrays.push_back(child_array);
   }
 
   RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
 
-  *out = std::make_shared<StructArray>(field()->type(), batch_size, children_arrays,
-                                       null_bitmap, null_count);
+  int64_t struct_length = children_arrays[0]->length();
+  for (size_t i = 1; i < children_arrays.size(); ++i) {
+    if (children_arrays[i]->length() != struct_length) {
+      // TODO(wesm): This should really only occur if the Parquet file is
+      // malformed. Should this be a DCHECK?
+      return Status::Invalid("Struct children had different lengths");
+    }
+  }
 
+  *out = std::make_shared<StructArray>(field()->type(), struct_length, children_arrays,
+                                       null_bitmap, null_count);
   return Status::OK();
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index ce82375..faaef9a 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -173,7 +173,7 @@ class PARQUET_EXPORT FileReader {
 // might change in the future.
 class PARQUET_EXPORT ColumnReader {
  public:
-  class PARQUET_NO_EXPORT Impl;
+  class PARQUET_NO_EXPORT ColumnReaderImpl;
   virtual ~ColumnReader();
 
   // Scan the next array of the indicated size. The actual size of the
@@ -185,11 +185,11 @@ class PARQUET_EXPORT ColumnReader {
   //
   // Returns Status::OK on a successful read, including if you have exhausted
   // the data available in the file.
-  ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* out);
+  ::arrow::Status NextBatch(int64_t batch_size, std::shared_ptr<::arrow::Array>* out);
 
  private:
-  std::unique_ptr<Impl> impl_;
-  explicit ColumnReader(std::unique_ptr<Impl> impl);
+  std::unique_ptr<ColumnReaderImpl> impl_;
+  explicit ColumnReader(std::unique_ptr<ColumnReaderImpl> impl);
 
   friend class FileReader;
   friend class PrimitiveImpl;


Mime
View raw message