parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [2/2] parquet-cpp git commit: PARQUET-834: Support I/O of arrow::ListArray
Date Thu, 02 Feb 2017 21:14:39 GMT
PARQUET-834: Support I/O of arrow::ListArray

Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>

Closes #229 from xhochy/PARQUET-834 and squashes the following commits:

ba68dec [Korn, Uwe] Remove signed/unsigned comparisons
0967992 [Korn, Uwe] Remove signed/unsigned comparisons
05979c3 [Korn, Uwe] Add missing RETURN_NOT_OK
6484e86 [Korn, Uwe] Remove unused member
e58a4e9 [Korn, Uwe] ListofLists finally work
e8267c7 [Korn, Uwe] Add test for 2 level List
f59da0c [Korn, Uwe] No need to distinguish anymore between different array types
1dc3bbe [Korn, Uwe] Determine values inputs
0ec90e9 [Korn, Uwe] Style fixes
ee609e5 [Korn, Uwe] Unify level generation
17cfe15 [Korn, Uwe] Write lists of any depth
75a4871 [Korn, Uwe] Directly use TypedWriteBatch
89b3e35 [Korn, Uwe] Remove unused import
ccdf25c [Korn, Uwe] Use TypedWriteBatch for all list cases
d7e09cf [Korn, Uwe] Reuse TypedWriteBatch for lists
d1b82d3 [Korn, Uwe] Activate fast path for timestamp type
0b98475 [Korn, Uwe] TypedWriteBatch should be applicable for all definition levels
34bea2f [Korn, Uwe] Push level generation one level up
89aaa8c [Korn, Uwe] Remove empty if section
0cda75b [Korn, Uwe] Refactor level generation into separate method
c50f9f7 [Korn, Uwe] Adjust WriteSpaced to behave as ReadSpaced
c76b7f3 [Korn, Uwe] Simplify list unittest
fbfe2a4 [Korn, Uwe] Review comments
bcef2b9 [Korn, Uwe] Make compatible schema detection more readable
be05282 [Korn, Uwe] Reuse repeated test code
856f75c [Korn, Uwe] Fix signed comparison
75c920d [Korn, Uwe] Correctly handle empty lists
93e92ab [Korn, Uwe] Fix benchmark compilation
0201578 [Korn, Uwe] Remove dead ASSERTs
e78cb13 [Korn, Uwe] Add support for lists with max_definition_level = 2
f43effc [Korn, Uwe] Remove 'Flat' from the reader API
e3b7f58 [Korn, Uwe] Update arrow hash
e44c1d8 [Korn, Uwe] Support boolean lists of lists
f46c056 [Korn, Uwe] Add UINT32 support
d20f2be [Korn, Uwe] Read string and binary listarrays
8d6b08b [Korn, Uwe] Remove 'Flat' from the writer API
f9ab91d [Korn, Uwe] PARQUET-834: Support I/O of arrow::ListArray


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/ad56e7ae
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/ad56e7ae
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/ad56e7ae

Branch: refs/heads/master
Commit: ad56e7aea9acbcf141fb72ef2085f63a278f3bc1
Parents: b1c85ca
Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>
Authored: Thu Feb 2 16:14:30 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Thu Feb 2 16:14:30 2017 -0500

----------------------------------------------------------------------
 cmake_modules/ThirdpartyToolchain.cmake         |   2 +-
 .../arrow/arrow-reader-writer-benchmark.cc      |   8 +-
 src/parquet/arrow/arrow-reader-writer-test.cc   | 118 +++--
 src/parquet/arrow/reader.cc                     | 468 ++++++++++-------
 src/parquet/arrow/reader.h                      |  16 +-
 src/parquet/arrow/test-util.h                   |  39 ++
 src/parquet/arrow/writer.cc                     | 514 +++++++++++++------
 src/parquet/arrow/writer.h                      |  10 +-
 src/parquet/column/column-reader-test.cc        |  43 +-
 src/parquet/column/reader.h                     |  77 ++-
 src/parquet/column/statistics.cc                |   4 +-
 src/parquet/column/writer.cc                    |  24 +-
 src/parquet/column/writer.h                     |   2 +-
 src/parquet/encodings/dictionary-encoding.h     |   2 +-
 src/parquet/encodings/encoder.h                 |   4 +-
 src/parquet/file/metadata.cc                    |   6 +
 src/parquet/file/metadata.h                     |   1 +
 src/parquet/file/writer-internal.cc             |   4 +
 src/parquet/file/writer-internal.h              |   1 +
 src/parquet/file/writer.cc                      |   4 +
 src/parquet/file/writer.h                       |   3 +
 src/parquet/util/bit-util.h                     |  22 +-
 src/parquet/util/rle-encoding.h                 |   6 +-
 23 files changed, 930 insertions(+), 448 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/cmake_modules/ThirdpartyToolchain.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index 8fc1b78..8b052df 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -22,7 +22,7 @@ set(THRIFT_VERSION "0.9.1")
 
 # Brotli 0.5.2 does not install headers/libraries yet, but 0.6.0.dev does
 set(BROTLI_VERSION "5db62dcc9d386579609540cdf8869e95ad334bbd")
-set(ARROW_VERSION "085c8754b0ab2da7fcd245fc88bc4de9a6806a4c")
+set(ARROW_VERSION "4226adfbc6b3dff10b3fe7a6691b30bcc94140bd")
 
 # find boost headers and libs
 set(Boost_DEBUG TRUE)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/arrow-reader-writer-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index cf90ebc..3f2a688 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -32,7 +32,7 @@ using arrow::NumericBuilder;
 namespace parquet {
 
 using arrow::FileReader;
-using arrow::WriteFlatTable;
+using arrow::WriteTable;
 using schema::PrimitiveNode;
 
 namespace benchmark {
@@ -109,7 +109,7 @@ static void BM_WriteColumn(::benchmark::State& state) {
 
   while (state.KeepRunning()) {
     auto output = std::make_shared<InMemoryOutputStream>();
-    WriteFlatTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE);
+    WriteTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE);
   }
   SetBytesProcessed<nullable, ParquetType>(state);
 }
@@ -128,7 +128,7 @@ static void BM_ReadColumn(::benchmark::State& state) {
   std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128);
   std::shared_ptr<::arrow::Table> table = TableFromVector<nullable, ParquetType>(values);
   auto output = std::make_shared<InMemoryOutputStream>();
-  WriteFlatTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE);
+  WriteTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE);
   std::shared_ptr<Buffer> buffer = output->GetBuffer();
 
   while (state.KeepRunning()) {
@@ -136,7 +136,7 @@ static void BM_ReadColumn(::benchmark::State& state) {
         ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
     FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
     std::shared_ptr<::arrow::Table> table;
-    filereader.ReadFlatTable(&table);
+    filereader.ReadTable(&table);
   }
   SetBytesProcessed<nullable, ParquetType>(state);
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index d681e57..619d5a3 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -36,6 +36,7 @@ using arrow::Column;
 using arrow::ChunkedArray;
 using arrow::default_memory_pool;
 using arrow::io::BufferReader;
+using arrow::ListArray;
 using arrow::PoolBuffer;
 using arrow::PrimitiveArray;
 using arrow::Status;
@@ -216,8 +217,8 @@ class TestParquetIO : public ::testing::Test {
 
   void ReadSingleColumnFile(
       std::unique_ptr<FileReader> file_reader, std::shared_ptr<Array>* out) {
-    std::unique_ptr<FlatColumnReader> column_reader;
-    ASSERT_OK_NO_THROW(file_reader->GetFlatColumn(0, &column_reader));
+    std::unique_ptr<ColumnReader> column_reader;
+    ASSERT_OK_NO_THROW(file_reader->GetColumn(0, &column_reader));
     ASSERT_NE(nullptr, column_reader.get());
 
     ASSERT_OK(column_reader->NextBatch(SMALL_SIZE, out));
@@ -235,7 +236,7 @@ class TestParquetIO : public ::testing::Test {
 
   void ReadTableFromFile(
       std::unique_ptr<FileReader> reader, std::shared_ptr<Table>* out) {
-    ASSERT_OK_NO_THROW(reader->ReadFlatTable(out));
+    ASSERT_OK_NO_THROW(reader->ReadTable(out));
     ASSERT_NE(nullptr, out->get());
   }
 
@@ -252,12 +253,47 @@ class TestParquetIO : public ::testing::Test {
     ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
   }
 
+  void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements,
+      int64_t null_count, std::shared_ptr<Table>* out) {
+    std::shared_ptr<Array> values;
+    ASSERT_OK(NullableArray<TestType>(
+        size * size, nullable_elements ? null_count : 0, kDefaultSeed, &values));
+    std::shared_ptr<ListArray> lists;
+    ASSERT_OK(MakeListArary(
+        values, size, nullable_lists ? null_count : 0, nullable_elements, &lists));
+    *out = MakeSimpleTable(lists, nullable_lists);
+  }
+
+  void PrepareListOfListTable(int64_t size, bool nullable_parent_lists,
+      bool nullable_lists, bool nullable_elements, int64_t null_count,
+      std::shared_ptr<Table>* out) {
+    std::shared_ptr<Array> values;
+    ASSERT_OK(NullableArray<TestType>(
+        size * 6, nullable_elements ? null_count : 0, kDefaultSeed, &values));
+    std::shared_ptr<ListArray> lists;
+    ASSERT_OK(MakeListArary(
+        values, size * 3, nullable_lists ? null_count : 0, nullable_elements, &lists));
+    std::shared_ptr<ListArray> parent_lists;
+    ASSERT_OK(MakeListArary(lists, size, nullable_parent_lists ? null_count : 0,
+        nullable_lists, &parent_lists));
+    *out = MakeSimpleTable(parent_lists, nullable_parent_lists);
+  }
+
+  void WriteReadAndCheckSingleColumnTable(const std::shared_ptr<Table>& table) {
+    std::shared_ptr<Array> values = table->column(0)->data()->chunk(0);
+    this->sink_ = std::make_shared<InMemoryOutputStream>();
+    ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(),
+        this->sink_, values->length(), default_writer_properties()));
+
+    this->ReadAndCheckSingleColumnTable(values);
+  }
+
   template <typename ArrayType>
-  void WriteFlatColumn(const std::shared_ptr<GroupNode>& schema,
+  void WriteColumn(const std::shared_ptr<GroupNode>& schema,
       const std::shared_ptr<ArrayType>& values) {
     FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema));
     ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
-    ASSERT_OK_NO_THROW(writer.WriteFlatColumnChunk(values.get()));
+    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get()));
     ASSERT_OK_NO_THROW(writer.Close());
   }
 
@@ -282,7 +318,7 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
   ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
 
   std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
-  this->WriteFlatColumn(schema, values);
+  this->WriteColumn(schema, values);
 
   this->ReadAndCheckSingleColumnFile(values.get());
 }
@@ -292,8 +328,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
   ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, false);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
-      this->sink_, values->length(), default_writer_properties()));
+  ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_,
+      values->length(), default_writer_properties()));
 
   std::shared_ptr<Table> out;
   std::unique_ptr<FileReader> reader;
@@ -314,7 +350,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
   ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
 
   std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
-  this->WriteFlatColumn(schema, values);
+  this->WriteColumn(schema, values);
 
   this->ReadAndCheckSingleColumnFile(values.get());
 }
@@ -325,11 +361,37 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
 
   ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, true);
-  this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
-      this->sink_, values->length(), default_writer_properties()));
+  this->WriteReadAndCheckSingleColumnTable(table);
+}
 
-  this->ReadAndCheckSingleColumnTable(values);
+TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) {
+  std::shared_ptr<Table> table;
+  this->PrepareListTable(SMALL_SIZE, true, true, 10, &table);
+  this->WriteReadAndCheckSingleColumnTable(table);
+}
+
+TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) {
+  std::shared_ptr<Table> table;
+  this->PrepareListTable(SMALL_SIZE, false, true, 10, &table);
+  this->WriteReadAndCheckSingleColumnTable(table);
+}
+
+TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) {
+  std::shared_ptr<Table> table;
+  this->PrepareListTable(SMALL_SIZE, true, false, 10, &table);
+  this->WriteReadAndCheckSingleColumnTable(table);
+}
+
+TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) {
+  std::shared_ptr<Table> table;
+  this->PrepareListTable(SMALL_SIZE, false, false, 0, &table);
+  this->WriteReadAndCheckSingleColumnTable(table);
+}
+
+TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) {
+  std::shared_ptr<Table> table;
+  this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table);
+  this->WriteReadAndCheckSingleColumnTable(table);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
@@ -341,8 +403,7 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
   FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
   for (int i = 0; i < 4; i++) {
     ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
-    ASSERT_OK_NO_THROW(
-        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get(), i * chunk_size, chunk_size));
   }
   ASSERT_OK_NO_THROW(writer.Close());
 
@@ -354,7 +415,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
   ASSERT_OK(NonNullArray<TypeParam>(LARGE_SIZE, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, false);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteFlatTable(
+  ASSERT_OK_NO_THROW(WriteTable(
       table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
 
   this->ReadAndCheckSingleColumnTable(values);
@@ -370,8 +431,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
   {
     // BufferOutputStream closed on gc
     auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer);
-    ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), default_memory_pool(), arrow_sink_,
-        512, default_writer_properties()));
+    ASSERT_OK_NO_THROW(WriteTable(table.get(), default_memory_pool(), arrow_sink_, 512,
+        default_writer_properties()));
 
     // XXX: Remove this after ARROW-455 completed
     ASSERT_OK(arrow_sink_->Close());
@@ -402,8 +463,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
   FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
   for (int i = 0; i < 4; i++) {
     ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
-    ASSERT_OK_NO_THROW(
-        writer.WriteFlatColumnChunk(values.get(), i * chunk_size, chunk_size));
+    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get(), i * chunk_size, chunk_size));
   }
   ASSERT_OK_NO_THROW(writer.Close());
 
@@ -417,8 +477,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
   ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, true);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
-      this->sink_, 512, default_writer_properties()));
+  ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_,
+      512, default_writer_properties()));
 
   this->ReadAndCheckSingleColumnTable(values);
 }
@@ -491,7 +551,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
           .version(ParquetVersion::PARQUET_2_0)
           ->build();
   ASSERT_OK_NO_THROW(
-      WriteFlatTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+      WriteTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
   this->ReadAndCheckSingleColumnTable(values);
 }
 
@@ -512,7 +572,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
       ::parquet::WriterProperties::Builder()
           .version(ParquetVersion::PARQUET_1_0)
           ->build();
-  ASSERT_OK_NO_THROW(WriteFlatTable(
+  ASSERT_OK_NO_THROW(WriteTable(
       table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties));
 
   std::shared_ptr<Array> expected_values;
@@ -544,8 +604,8 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
   ASSERT_OK(builder.Finish(&values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, false);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteFlatTable(table.get(), ::arrow::default_memory_pool(),
-      this->sink_, values->length(), default_writer_properties()));
+  ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_,
+      values->length(), default_writer_properties()));
 
   std::shared_ptr<Table> out;
   std::unique_ptr<FileReader> reader;
@@ -675,7 +735,7 @@ void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
     const std::vector<int>& column_subset, std::shared_ptr<Table>* out) {
   auto sink = std::make_shared<InMemoryOutputStream>();
 
-  ASSERT_OK_NO_THROW(WriteFlatTable(
+  ASSERT_OK_NO_THROW(WriteTable(
       table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
 
   std::shared_ptr<Buffer> buffer = sink->GetBuffer();
@@ -687,10 +747,10 @@ void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
   reader->set_num_threads(num_threads);
 
   if (column_subset.size() > 0) {
-    ASSERT_OK_NO_THROW(reader->ReadFlatTable(column_subset, out));
+    ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
   } else {
     // Read everything
-    ASSERT_OK_NO_THROW(reader->ReadFlatTable(out));
+    ASSERT_OK_NO_THROW(reader->ReadTable(out));
   }
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index a60d0b2..5059494 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -34,8 +34,11 @@
 #include "arrow/util/bit-util.h"
 
 using arrow::Array;
+using arrow::BooleanArray;
 using arrow::Column;
 using arrow::Field;
+using arrow::Int32Array;
+using arrow::ListArray;
 using arrow::MemoryPool;
 using arrow::PoolBuffer;
 using arrow::Status;
@@ -65,11 +68,11 @@ class FileReader::Impl {
   virtual ~Impl() {}
 
   bool CheckForFlatColumn(const ColumnDescriptor* descr);
-  Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
-  Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
-  Status ReadFlatTable(std::shared_ptr<Table>* out);
-  Status ReadFlatTable(
-      const std::vector<int>& column_indices, std::shared_ptr<Table>* out);
+  bool CheckForFlatListColumn(const ColumnDescriptor* descr);
+  Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
+  Status ReadColumn(int i, std::shared_ptr<Array>* out);
+  Status ReadTable(std::shared_ptr<Table>* out);
+  Status ReadTable(const std::vector<int>& column_indices, std::shared_ptr<Table>* out);
   const ParquetFileReader* parquet_reader() const { return reader_.get(); }
 
   void set_num_threads(int num_threads) { num_threads_ = num_threads; }
@@ -81,7 +84,7 @@ class FileReader::Impl {
   int num_threads_;
 };
 
-class FlatColumnReader::Impl {
+class ColumnReader::Impl {
  public:
   Impl(MemoryPool* pool, const ColumnDescriptor* descr, ParquetFileReader* reader,
       int column_index);
@@ -97,12 +100,16 @@ class FlatColumnReader::Impl {
 
   template <typename ArrowType>
   Status InitDataBuffer(int batch_size);
+  Status InitValidBits(int batch_size);
   template <typename ArrowType, typename ParquetType>
-  Status ReadNullableFlatBatch(TypedColumnReader<ParquetType>* reader,
-      int16_t* def_levels, int64_t values_to_read, int64_t* levels_read);
+  Status ReadNullableBatch(TypedColumnReader<ParquetType>* reader, int16_t* def_levels,
+      int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read,
+      int64_t* values_read);
   template <typename ArrowType, typename ParquetType>
   Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
       int64_t values_to_read, int64_t* levels_read);
+  Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
+      int64_t total_values_read, std::shared_ptr<Array>* array);
 
  private:
   void NextRowGroup();
@@ -120,11 +127,12 @@ class FlatColumnReader::Impl {
   ParquetFileReader* reader_;
   int column_index_;
   int next_row_group_;
-  std::shared_ptr<ColumnReader> column_reader_;
+  std::shared_ptr<::parquet::ColumnReader> column_reader_;
   std::shared_ptr<Field> field_;
 
   PoolBuffer values_buffer_;
   PoolBuffer def_levels_buffer_;
+  PoolBuffer rep_levels_buffer_;
   std::shared_ptr<PoolBuffer> data_buffer_;
   uint8_t* data_buffer_ptr_;
   std::shared_ptr<PoolBuffer> valid_bits_buffer_;
@@ -136,41 +144,34 @@ class FlatColumnReader::Impl {
 FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
     : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
 
-bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) {
-  if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) {
-    return false;
-  } else if ((descr->max_definition_level() == 1) &&
-             (descr->schema_node()->repetition() != Repetition::OPTIONAL)) {
-    return false;
-  }
-  return true;
-}
-
-Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
+Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
   const SchemaDescriptor* schema = reader_->metadata()->schema();
 
-  if (!CheckForFlatColumn(schema->Column(i))) {
-    return Status::Invalid("The requested column is not flat");
-  }
-  std::unique_ptr<FlatColumnReader::Impl> impl(
-      new FlatColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i));
-  *out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl)));
+  std::unique_ptr<ColumnReader::Impl> impl(
+      new ColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i));
+  *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
   return Status::OK();
 }
 
-Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
-  std::unique_ptr<FlatColumnReader> flat_column_reader;
-  RETURN_NOT_OK(GetFlatColumn(i, &flat_column_reader));
-  return flat_column_reader->NextBatch(reader_->metadata()->num_rows(), out);
+Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
+  std::unique_ptr<ColumnReader> flat_column_reader;
+  RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
+
+  int64_t batch_size = 0;
+  for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
+    batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+  }
+
+  return flat_column_reader->NextBatch(batch_size, out);
 }
 
-Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) {
+Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
   std::vector<int> column_indices(reader_->metadata()->num_columns());
 
   for (size_t i = 0; i < column_indices.size(); ++i) {
     column_indices[i] = i;
   }
-  return ReadFlatTable(column_indices, table);
+  return ReadTable(column_indices, table);
 }
 
 template <class FUNCTION>
@@ -207,7 +208,7 @@ Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
   return Status::OK();
 }
 
-Status FileReader::Impl::ReadFlatTable(
+Status FileReader::Impl::ReadTable(
     const std::vector<int>& indices, std::shared_ptr<Table>* table) {
   auto descr = reader_->metadata()->schema();
 
@@ -219,19 +220,19 @@ Status FileReader::Impl::ReadFlatTable(
   int nthreads = std::min<int>(num_threads_, num_columns);
   std::vector<std::shared_ptr<Column>> columns(num_columns);
 
-  auto ReadColumn = [&indices, &schema, &columns, this](int i) {
+  auto ReadColumnFunc = [&indices, &schema, &columns, this](int i) {
     std::shared_ptr<Array> array;
-    RETURN_NOT_OK(ReadFlatColumn(indices[i], &array));
+    RETURN_NOT_OK(ReadColumn(indices[i], &array));
     columns[i] = std::make_shared<Column>(schema->field(i), array);
     return Status::OK();
   };
 
   if (nthreads == 1) {
     for (int i = 0; i < num_columns; i++) {
-      RETURN_NOT_OK(ReadColumn(i));
+      RETURN_NOT_OK(ReadColumnFunc(i));
     }
   } else {
-    RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumn));
+    RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc));
   }
 
   *table = std::make_shared<Table>(name, schema, columns);
@@ -261,30 +262,30 @@ Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
       file, allocator, ::parquet::default_reader_properties(), nullptr, reader);
 }
 
-Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
-  return impl_->GetFlatColumn(i, out);
+Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
+  return impl_->GetColumn(i, out);
 }
 
-Status FileReader::ReadFlatColumn(int i, std::shared_ptr<Array>* out) {
+Status FileReader::ReadColumn(int i, std::shared_ptr<Array>* out) {
   try {
-    return impl_->ReadFlatColumn(i, out);
+    return impl_->ReadColumn(i, out);
   } catch (const ::parquet::ParquetException& e) {
     return ::arrow::Status::IOError(e.what());
   }
 }
 
-Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) {
+Status FileReader::ReadTable(std::shared_ptr<Table>* out) {
   try {
-    return impl_->ReadFlatTable(out);
+    return impl_->ReadTable(out);
   } catch (const ::parquet::ParquetException& e) {
     return ::arrow::Status::IOError(e.what());
   }
 }
 
-Status FileReader::ReadFlatTable(
+Status FileReader::ReadTable(
     const std::vector<int>& column_indices, std::shared_ptr<Table>* out) {
   try {
-    return impl_->ReadFlatTable(column_indices, out);
+    return impl_->ReadTable(column_indices, out);
   } catch (const ::parquet::ParquetException& e) {
     return ::arrow::Status::IOError(e.what());
   }
@@ -298,7 +299,7 @@ const ParquetFileReader* FileReader::parquet_reader() const {
   return impl_->parquet_reader();
 }
 
-FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
+ColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
     ParquetFileReader* reader, int column_index)
     : pool_(pool),
       descr_(descr),
@@ -306,15 +307,15 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
       column_index_(column_index),
       next_row_group_(0),
       values_buffer_(pool),
-      def_levels_buffer_(pool) {
+      def_levels_buffer_(pool),
+      rep_levels_buffer_(pool) {
   NodeToField(descr_->schema_node(), &field_);
   NextRowGroup();
 }
 
 template <typename ArrowType, typename ParquetType>
-Status FlatColumnReader::Impl::ReadNonNullableBatch(
-    TypedColumnReader<ParquetType>* reader, int64_t values_to_read,
-    int64_t* levels_read) {
+Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
+    int64_t values_to_read, int64_t* levels_read) {
   using ArrowCType = typename ArrowType::c_type;
   using ParquetCType = typename ParquetType::c_type;
 
@@ -333,7 +334,7 @@ Status FlatColumnReader::Impl::ReadNonNullableBatch(
 
 #define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)                 \
   template <>                                                                      \
-  Status FlatColumnReader::Impl::ReadNonNullableBatch<ArrowType, ParquetType>(     \
+  Status ColumnReader::Impl::ReadNonNullableBatch<ArrowType, ParquetType>(         \
       TypedColumnReader<ParquetType> * reader, int64_t values_to_read,             \
       int64_t * levels_read) {                                                     \
     int64_t values_read;                                                           \
@@ -352,7 +353,7 @@ NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
 NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
 
 template <>
-Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
+Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
     TypedColumnReader<Int96Type>* reader, int64_t values_to_read, int64_t* levels_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96Type), false));
   auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
@@ -370,7 +371,7 @@ Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int9
 }
 
 template <>
-Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
+Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
     TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
     int64_t* levels_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
@@ -388,47 +389,49 @@ Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, Boolea
 }
 
 template <typename ArrowType, typename ParquetType>
-Status FlatColumnReader::Impl::ReadNullableFlatBatch(
-    TypedColumnReader<ParquetType>* reader, int16_t* def_levels, int64_t values_to_read,
-    int64_t* levels_read) {
+Status ColumnReader::Impl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
+    int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read,
+    int64_t* levels_read, int64_t* values_read) {
   using ArrowCType = typename ArrowType::c_type;
   using ParquetCType = typename ParquetType::c_type;
 
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
   auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
-  int null_count;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatchSpaced(values_to_read, def_levels, nullptr,
-                               values, &null_count, valid_bits_ptr_, valid_bits_idx_));
+  int64_t null_count;
+  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels,
+      values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count));
 
   auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
   INIT_BITSET(valid_bits_ptr_, valid_bits_idx_);
 
-  for (int64_t i = 0; i < *levels_read; i++) {
-    if (bitset & (1 << bit_offset)) { data_ptr[valid_bits_idx_ + i] = values[i]; }
+  for (int64_t i = 0; i < *values_read; i++) {
+    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
+      data_ptr[valid_bits_idx_ + i] = values[i];
+    }
     READ_NEXT_BITSET(valid_bits_ptr_);
   }
   null_count_ += null_count;
-  valid_bits_idx_ += *levels_read;
+  valid_bits_idx_ += *values_read;
 
   return Status::OK();
 }
 
-#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)                 \
-  template <>                                                                   \
-  Status FlatColumnReader::Impl::ReadNullableFlatBatch<ArrowType, ParquetType>( \
-      TypedColumnReader<ParquetType> * reader, int16_t * def_levels,            \
-      int64_t values_to_read, int64_t * levels_read) {                          \
-    auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_);                 \
-    int null_count;                                                             \
-    PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatchSpaced(values_to_read, \
-                             def_levels, nullptr, data_ptr + valid_bits_idx_,   \
-                             &null_count, valid_bits_ptr_, valid_bits_idx_));   \
-                                                                                \
-    valid_bits_idx_ += *levels_read;                                            \
-    null_count_ += null_count;                                                  \
-                                                                                \
-    return Status::OK();                                                        \
+#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)                          \
+  template <>                                                                            \
+  Status ColumnReader::Impl::ReadNullableBatch<ArrowType, ParquetType>(                  \
+      TypedColumnReader<ParquetType> * reader, int16_t * def_levels,                     \
+      int16_t * rep_levels, int64_t values_to_read, int64_t * levels_read,               \
+      int64_t * values_read) {                                                           \
+    auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_);                          \
+    int64_t null_count;                                                                  \
+    PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels, \
+        data_ptr + valid_bits_idx_, valid_bits_ptr_, valid_bits_idx_, levels_read,       \
+        values_read, &null_count));                                                      \
+                                                                                         \
+    valid_bits_idx_ += *values_read;                                                     \
+    null_count_ += null_count;                                                           \
+                                                                                         \
+    return Status::OK();                                                                 \
   }
 
 NULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
@@ -437,56 +440,54 @@ NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
 NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
 
 template <>
-Status FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::TimestampType, Int96Type>(
-    TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int64_t values_to_read,
-    int64_t* levels_read) {
+Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
+    TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int16_t* rep_levels,
+    int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96Type), false));
   auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
-  int null_count;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatchSpaced(values_to_read, def_levels, nullptr,
-                               values, &null_count, valid_bits_ptr_, valid_bits_idx_));
+  int64_t null_count;
+  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels,
+      values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count));
 
   auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
   INIT_BITSET(valid_bits_ptr_, valid_bits_idx_);
-  for (int64_t i = 0; i < *levels_read; i++) {
-    if (bitset & (1 << bit_offset)) {
+  for (int64_t i = 0; i < *values_read; i++) {
+    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
       data_ptr[valid_bits_idx_ + i] = impala_timestamp_to_nanoseconds(values[i]);
     }
     READ_NEXT_BITSET(valid_bits_ptr_);
   }
   null_count_ += null_count;
-  valid_bits_idx_ += *levels_read;
+  valid_bits_idx_ += *values_read;
 
   return Status::OK();
 }
 
 template <>
-Status FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
-    TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int64_t values_to_read,
-    int64_t* levels_read) {
+Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
+    TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
+    int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
   RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
   auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
-  int null_count;
-  PARQUET_CATCH_NOT_OK(*levels_read =
-                           reader->ReadBatchSpaced(values_to_read, def_levels, nullptr,
-                               values, &null_count, valid_bits_ptr_, valid_bits_idx_));
+  int64_t null_count;
+  PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(values_to_read, def_levels, rep_levels,
+      values, valid_bits_ptr_, valid_bits_idx_, levels_read, values_read, &null_count));
 
   INIT_BITSET(valid_bits_ptr_, valid_bits_idx_);
-  for (int64_t i = 0; i < *levels_read; i++) {
-    if (bitset & (1 << bit_offset)) {
+  for (int64_t i = 0; i < *values_read; i++) {
+    if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
       if (values[i]) { ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_ + i); }
     }
     READ_NEXT_BITSET(valid_bits_ptr_);
   }
-  valid_bits_idx_ += *levels_read;
+  valid_bits_idx_ += *values_read;
   null_count_ += null_count;
 
   return Status::OK();
 }
 
 template <typename ArrowType>
-Status FlatColumnReader::Impl::InitDataBuffer(int batch_size) {
+Status ColumnReader::Impl::InitDataBuffer(int batch_size) {
   using ArrowCType = typename ArrowType::c_type;
   data_buffer_ = std::make_shared<PoolBuffer>(pool_);
   RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType), false));
@@ -496,7 +497,7 @@ Status FlatColumnReader::Impl::InitDataBuffer(int batch_size) {
 }
 
 template <>
-Status FlatColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
+Status ColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
   data_buffer_ = std::make_shared<PoolBuffer>(pool_);
   RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8, false));
   data_buffer_ptr_ = data_buffer_->mutable_data();
@@ -505,13 +506,7 @@ Status FlatColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_si
   return Status::OK();
 }
 
-template <typename ArrowType, typename ParquetType>
-Status FlatColumnReader::Impl::TypedReadBatch(
-    int batch_size, std::shared_ptr<Array>* out) {
-  using ArrowCType = typename ArrowType::c_type;
-
-  int values_to_read = batch_size;
-  RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size));
+Status ColumnReader::Impl::InitValidBits(int batch_size) {
   valid_bits_idx_ = 0;
   if (descr_->max_definition_level() > 0) {
     int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size + 1) / 8;
@@ -521,93 +516,207 @@ Status FlatColumnReader::Impl::TypedReadBatch(
     memset(valid_bits_ptr_, 0, valid_bits_size);
     null_count_ = 0;
   }
+  return Status::OK();
+}
 
-  while ((values_to_read > 0) && column_reader_) {
-    if (descr_->max_definition_level() > 0) {
-      RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false));
+Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels,
+    const int16_t* rep_levels, int64_t total_levels_read, std::shared_ptr<Array>* array) {
+  if (descr_->max_repetition_level() > 0) {
+    std::shared_ptr<::arrow::Schema> arrow_schema;
+    RETURN_NOT_OK(
+        FromParquetSchema(reader_->metadata()->schema(), {column_index_}, &arrow_schema));
+
+    // Walk downwards to extract nullability
+    std::shared_ptr<Field> current_field = arrow_schema->field(0);
+    std::vector<bool> nullable;
+    std::vector<std::shared_ptr<::arrow::Int32Builder>> offset_builders;
+    std::vector<std::shared_ptr<::arrow::BooleanBuilder>> valid_bits_builders;
+    nullable.push_back(current_field->nullable);
+    while (current_field->type->num_children() > 0) {
+      if (current_field->type->num_children() > 1) {
+        return Status::NotImplemented(
+            "Fields with more than one child are not supported.");
+      } else {
+        if (current_field->type->type != ::arrow::Type::LIST) {
+          return Status::NotImplemented(
+              "Currently only nesting with Lists is supported.");
+        }
+        current_field = current_field->type->child(0);
+      }
+      offset_builders.emplace_back(
+          std::make_shared<::arrow::Int32Builder>(pool_, ::arrow::int32()));
+      valid_bits_builders.emplace_back(
+          std::make_shared<::arrow::BooleanBuilder>(pool_, ::arrow::boolean()));
+      nullable.push_back(current_field->nullable);
+    }
+
+    int64_t list_depth = offset_builders.size();
+    // This describes the minimal definition that describes a level that
+    // reflects a value in the primitive values array.
+    int16_t values_def_level = descr_->max_definition_level();
+    if (nullable[nullable.size() - 1]) { values_def_level--; }
+
+    // The definition levels that are needed so that a list is declared
+    // as empty and not null.
+    std::vector<int16_t> empty_def_level(list_depth);
+    int def_level = 0;
+    for (int i = 0; i < list_depth; i++) {
+      if (nullable[i]) { def_level++; }
+      empty_def_level[i] = def_level;
+      def_level++;
+    }
+
+    int32_t values_offset = 0;
+    std::vector<int64_t> null_counts(list_depth, 0);
+    for (int64_t i = 0; i < total_levels_read; i++) {
+      int16_t rep_level = rep_levels[i];
+      if (rep_level < descr_->max_repetition_level()) {
+        for (int64_t j = rep_level; j < list_depth; j++) {
+          if (j == (list_depth - 1)) {
+            RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+          } else {
+            RETURN_NOT_OK(offset_builders[j]->Append(offset_builders[j + 1]->length()));
+          }
+
+          if (((empty_def_level[j] - 1) == def_levels[i]) && (nullable[j])) {
+            RETURN_NOT_OK(valid_bits_builders[j]->Append(false));
+            null_counts[j]++;
+            break;
+          } else {
+            RETURN_NOT_OK(valid_bits_builders[j]->Append(true));
+            if (empty_def_level[j] == def_levels[i]) { break; }
+          }
+        }
+      }
+      if (def_levels[i] >= values_def_level) { values_offset++; }
+    }
+    // Add the final offset to all lists
+    for (int64_t j = 0; j < list_depth; j++) {
+      if (j == (list_depth - 1)) {
+        RETURN_NOT_OK(offset_builders[j]->Append(values_offset));
+      } else {
+        RETURN_NOT_OK(offset_builders[j]->Append(offset_builders[j + 1]->length()));
+      }
+    }
+
+    std::vector<std::shared_ptr<Buffer>> offsets;
+    std::vector<std::shared_ptr<Buffer>> valid_bits;
+    std::vector<int64_t> list_lengths;
+    for (int64_t j = 0; j < list_depth; j++) {
+      list_lengths.push_back(offset_builders[j]->length() - 1);
+      std::shared_ptr<Array> array;
+      RETURN_NOT_OK(offset_builders[j]->Finish(&array));
+      offsets.emplace_back(std::static_pointer_cast<Int32Array>(array)->data());
+      RETURN_NOT_OK(valid_bits_builders[j]->Finish(&array));
+      valid_bits.emplace_back(std::static_pointer_cast<BooleanArray>(array)->data());
+    }
+
+    std::shared_ptr<Array> output(*array);
+    for (int64_t j = list_depth - 1; j >= 0; j--) {
+      auto list_type = std::make_shared<::arrow::ListType>(
+          std::make_shared<Field>("item", output->type(), nullable[j + 1]));
+      output = std::make_shared<::arrow::ListArray>(
+          list_type, list_lengths[j], offsets[j], output, null_counts[j], valid_bits[j]);
     }
+    *array = output;
+  }
+  return Status::OK();
+}
+
+template <typename ArrowType, typename ParquetType>
+Status ColumnReader::Impl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out) {
+  using ArrowCType = typename ArrowType::c_type;
+
+  int values_to_read = batch_size;
+  int total_levels_read = 0;
+  RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size));
+  RETURN_NOT_OK(InitValidBits(batch_size));
+  if (descr_->max_definition_level() > 0) {
+    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
+  }
+  if (descr_->max_repetition_level() > 0) {
+    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
+  }
+  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
+
+  while ((values_to_read > 0) && column_reader_) {
     auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
+    int64_t values_read;
     int64_t levels_read;
-    int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
     if (descr_->max_definition_level() == 0) {
       RETURN_NOT_OK((ReadNonNullableBatch<ArrowType, ParquetType>(
-          reader, values_to_read, &levels_read)));
+          reader, values_to_read, &values_read)));
     } else {
-      // As per the defintion and checks for flat columns:
-      // descr_->max_definition_level() == 1
-      RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
-          reader, def_levels, values_to_read, &levels_read)));
+      // As per the defintion and checks for flat (list) columns:
+      // descr_->max_definition_level() > 0, <= 3
+      RETURN_NOT_OK((ReadNullableBatch<ArrowType, ParquetType>(reader,
+          def_levels + total_levels_read, rep_levels + total_levels_read, values_to_read,
+          &levels_read, &values_read)));
+      total_levels_read += levels_read;
     }
-    values_to_read -= levels_read;
+    values_to_read -= values_read;
     if (!column_reader_->HasNext()) { NextRowGroup(); }
   }
 
+  // Shrink arrays as they may be larger than the output.
+  RETURN_NOT_OK(data_buffer_->Resize(valid_bits_idx_ * sizeof(ArrowCType)));
   if (descr_->max_definition_level() > 0) {
-    // TODO: Shrink arrays in the case they are too large
     if (valid_bits_idx_ < batch_size * 0.8) {
-      // Shrink arrays as they are larger than the output.
-      // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
-      //    without the need for a copy. Given a decent underlying allocator this
-      //    should still free some underlying pages to the OS.
-
-      auto data_buffer = std::make_shared<PoolBuffer>(pool_);
-      RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(ArrowCType), false));
-      memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
-      data_buffer_ = data_buffer;
-
-      auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
-      RETURN_NOT_OK(valid_bits_buffer->Resize(
+      RETURN_NOT_OK(valid_bits_buffer_->Resize(
           ::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8, false));
-      memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
-          valid_bits_buffer->size());
-      valid_bits_buffer_ = valid_bits_buffer;
     }
     *out = std::make_shared<ArrayType<ArrowType>>(
         field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_);
-    // Relase the ownership
-    data_buffer_.reset();
+    // Relase the ownership as the Buffer is now part of a new Array
     valid_bits_buffer_.reset();
-    return Status::OK();
   } else {
     *out = std::make_shared<ArrayType<ArrowType>>(
         field_->type, valid_bits_idx_, data_buffer_);
-    data_buffer_.reset();
-    return Status::OK();
   }
+  // Relase the ownership as the Buffer is now part of a new Array
+  data_buffer_.reset();
+
+  // Check if we should transform this array into an list array.
+  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
 }
 
 template <>
-Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
+Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
     int batch_size, std::shared_ptr<Array>* out) {
   int values_to_read = batch_size;
+  int total_levels_read = 0;
   RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size));
-  valid_bits_idx_ = 0;
+  RETURN_NOT_OK(InitValidBits(batch_size));
   if (descr_->max_definition_level() > 0) {
-    valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
-    int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size + 1) / 8;
-    RETURN_NOT_OK(valid_bits_buffer_->Resize(valid_bits_size, false));
-    valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
-    memset(valid_bits_ptr_, 0, valid_bits_size);
-    null_count_ = 0;
+    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
+  }
+  if (descr_->max_repetition_level() > 0) {
+    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
   }
+  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
 
   while ((values_to_read > 0) && column_reader_) {
     if (descr_->max_definition_level() > 0) {
       RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false));
     }
     auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
+    int64_t values_read;
     int64_t levels_read;
     int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
     if (descr_->max_definition_level() == 0) {
       RETURN_NOT_OK((ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
-          reader, values_to_read, &levels_read)));
+          reader, values_to_read, &values_read)));
     } else {
       // As per the defintion and checks for flat columns:
       // descr_->max_definition_level() == 1
-      RETURN_NOT_OK((ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
-          reader, def_levels, values_to_read, &levels_read)));
+      RETURN_NOT_OK((ReadNullableBatch<::arrow::BooleanType, BooleanType>(reader,
+          def_levels + total_levels_read, rep_levels + total_levels_read, values_to_read,
+          &levels_read, &values_read)));
+      total_levels_read += levels_read;
     }
-    values_to_read -= levels_read;
+    values_to_read -= values_read;
     if (!column_reader_->HasNext()) { NextRowGroup(); }
   }
 
@@ -631,39 +740,46 @@ Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>
           valid_bits_buffer->size());
       valid_bits_buffer_ = valid_bits_buffer;
     }
-    *out = std::make_shared<::arrow::BooleanArray>(
+    *out = std::make_shared<BooleanArray>(
         field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_);
     // Relase the ownership
     data_buffer_.reset();
     valid_bits_buffer_.reset();
-    return Status::OK();
   } else {
-    *out = std::make_shared<::arrow::BooleanArray>(
-        field_->type, valid_bits_idx_, data_buffer_);
+    *out = std::make_shared<BooleanArray>(field_->type, valid_bits_idx_, data_buffer_);
     data_buffer_.reset();
-    return Status::OK();
   }
+
+  // Check if we should transform this array into an list array.
+  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
 }
 
 template <typename ArrowType>
-Status FlatColumnReader::Impl::ReadByteArrayBatch(
+Status ColumnReader::Impl::ReadByteArrayBatch(
     int batch_size, std::shared_ptr<Array>* out) {
   using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
 
+  int total_levels_read = 0;
+  if (descr_->max_definition_level() > 0) {
+    RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
+  }
+  if (descr_->max_repetition_level() > 0) {
+    RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
+  }
+  int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+  int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
+
   int values_to_read = batch_size;
   BuilderType builder(pool_, field_->type);
   while ((values_to_read > 0) && column_reader_) {
     RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ByteArray), false));
-    if (descr_->max_definition_level() > 0) {
-      RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false));
-    }
     auto reader = dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get());
     int64_t values_read;
     int64_t levels_read;
-    int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
     auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
-    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
-                             values_to_read, def_levels, nullptr, values, &values_read));
+    PARQUET_CATCH_NOT_OK(
+        levels_read = reader->ReadBatch(values_to_read, def_levels + total_levels_read,
+            rep_levels + total_levels_read, values, &values_read));
     values_to_read -= levels_read;
     if (descr_->max_definition_level() == 0) {
       for (int64_t i = 0; i < levels_read; i++) {
@@ -671,32 +787,38 @@ Status FlatColumnReader::Impl::ReadByteArrayBatch(
             builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
       }
     } else {
-      // descr_->max_definition_level() == 1
+      // descr_->max_definition_level() > 0
       int values_idx = 0;
+      int nullable_elements = descr_->schema_node()->is_optional();
       for (int64_t i = 0; i < levels_read; i++) {
-        if (def_levels[i] < descr_->max_definition_level()) {
+        if (nullable_elements &&
+            (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
           RETURN_NOT_OK(builder.AppendNull());
-        } else {
+        } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
           RETURN_NOT_OK(
               builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
                   values[values_idx].len));
           values_idx++;
         }
       }
+      total_levels_read += levels_read;
     }
     if (!column_reader_->HasNext()) { NextRowGroup(); }
   }
-  return builder.Finish(out);
+
+  RETURN_NOT_OK(builder.Finish(out));
+  // Check if we should transform this array into an list array.
+  return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
 }
 
 template <>
-Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
+Status ColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
     int batch_size, std::shared_ptr<Array>* out) {
   return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out);
 }
 
 template <>
-Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
+Status ColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
     int batch_size, std::shared_ptr<Array>* out) {
   return ReadByteArrayBatch<::arrow::StringType>(batch_size, out);
 }
@@ -706,7 +828,7 @@ Status FlatColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType
     return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
     break;
 
-Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
   if (!column_reader_) {
     // Exhausted all row groups.
     *out = nullptr;
@@ -747,7 +869,7 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
   }
 }
 
-void FlatColumnReader::Impl::NextRowGroup() {
+void ColumnReader::Impl::NextRowGroup() {
   if (next_row_group_ < reader_->metadata()->num_row_groups()) {
     column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
     next_row_group_++;
@@ -756,11 +878,11 @@ void FlatColumnReader::Impl::NextRowGroup() {
   }
 }
 
-FlatColumnReader::FlatColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
 
-FlatColumnReader::~FlatColumnReader() {}
+ColumnReader::~ColumnReader() {}
 
-Status FlatColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
   return impl_->NextBatch(batch_size, out);
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 934b826..1aa9c3e 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -39,7 +39,7 @@ namespace parquet {
 
 namespace arrow {
 
-class FlatColumnReader;
+class ColumnReader;
 
 // Arrow read adapter class for deserializing Parquet files as Arrow row
 // batches.
@@ -94,17 +94,17 @@ class PARQUET_EXPORT FileReader {
   // fully-materialized arrow::Array instances
   //
   // Returns error status if the column of interest is not flat.
-  ::arrow::Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+  ::arrow::Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
 
   // Read column as a whole into an Array.
-  ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out);
+  ::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::Array>* out);
 
   // Read a table of flat columns into a Table.
-  ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out);
+  ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out);
 
   // Read a table of flat columns into a Table. Read only the indicated column
   // indices (relative to the schema)
-  ::arrow::Status ReadFlatTable(
+  ::arrow::Status ReadTable(
       const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out);
 
   const ParquetFileReader* parquet_reader() const;
@@ -126,9 +126,9 @@ class PARQUET_EXPORT FileReader {
 //
 // We also do not expose any internal Parquet details, such as row groups. This
 // might change in the future.
-class PARQUET_EXPORT FlatColumnReader {
+class PARQUET_EXPORT ColumnReader {
  public:
-  virtual ~FlatColumnReader();
+  virtual ~ColumnReader();
 
   // Scan the next array of the indicated size. The actual size of the
   // returned array may be less than the passed size depending how much data is
@@ -144,7 +144,7 @@ class PARQUET_EXPORT FlatColumnReader {
  private:
   class PARQUET_NO_EXPORT Impl;
   std::unique_ptr<Impl> impl_;
-  explicit FlatColumnReader(std::unique_ptr<Impl> impl);
+  explicit ColumnReader(std::unique_ptr<Impl> impl);
 
   friend class FileReader;
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 4760f0e..4d87dd8 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -177,6 +177,45 @@ typename std::enable_if<is_arrow_bool<ArrowType>::value, Status>::type NullableA
   return builder.Finish(out);
 }
 
+/// Wrap an Array into a ListArray by splitting it up into size lists.
+///
+/// This helper function only supports (size/2) nulls.
+Status MakeListArary(const std::shared_ptr<Array>& values, int64_t size,
+    int64_t null_count, bool nullable_values, std::shared_ptr<::arrow::ListArray>* out) {
+  // We always include an empty list
+  int64_t non_null_entries = size - null_count - 1;
+  int64_t length_per_entry = values->length() / non_null_entries;
+
+  auto offsets = std::make_shared<::arrow::PoolBuffer>(::arrow::default_memory_pool());
+  RETURN_NOT_OK(offsets->Resize((size + 1) * sizeof(int32_t)));
+  int32_t* offsets_ptr = reinterpret_cast<int32_t*>(offsets->mutable_data());
+
+  auto null_bitmap =
+      std::make_shared<::arrow::PoolBuffer>(::arrow::default_memory_pool());
+  int64_t bitmap_size = ::arrow::BitUtil::CeilByte(size) / 8;
+  RETURN_NOT_OK(null_bitmap->Resize(bitmap_size));
+  uint8_t* null_bitmap_ptr = null_bitmap->mutable_data();
+  memset(null_bitmap_ptr, 0, bitmap_size);
+
+  int32_t current_offset = 0;
+  for (int64_t i = 0; i < size; i++) {
+    offsets_ptr[i] = current_offset;
+    if (!(((i % 2) == 0) && ((i / 2) < null_count))) {
+      // Non-null list (list with index 1 is always empty).
+      ::arrow::BitUtil::SetBit(null_bitmap_ptr, i);
+      if (i != 1) { current_offset += length_per_entry; }
+    }
+  }
+  offsets_ptr[size] = values->length();
+
+  auto value_field =
+      std::make_shared<::arrow::Field>("item", values->type(), nullable_values);
+  *out = std::make_shared<::arrow::ListArray>(
+      ::arrow::list(value_field), size, offsets, values, null_count, null_bitmap);
+
+  return Status::OK();
+}
+
 std::shared_ptr<::arrow::Column> MakeColumn(
     const std::string& name, const std::shared_ptr<Array>& array, bool nullable) {
   auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable);


Mime
View raw message