PARQUET-1100: Introduce RecordReader interface to better support nested data, refactor parquet/arrow/reader
We did not have very consistent logic around reading values from leaf nodes versus reading semantic records where the repetition level is greater than zero. This introduces a reader class that reads from column chunks until it identifies the end of records. It also reads values (with spaces, if required by the schema) into internal buffers. This permitted a substantial refactoring and simplification of the code in parquet::arrow where we were handling the interpretation of batch reads as records manually.
As follow up patch, we should be able to take a collection of record readers from the same "tree" in a nested type and reassemble the intermediate Arrow structure and dealing with any redundant structure information in repetition and definition levels. This should a allow a unification of our nested data read code path so that we can read arbitrary nested structures.
Author: Wes McKinney <wes.mckinney@twosigma.com>
Closes #398 from wesm/PARQUET-1100 and squashes the following commits:
9ea85d9 [Wes McKinney] Revert to const args
f4dc0fe [Wes McKinney] Make parquet::schema::Node non-copyable. Use const-refs instead of const-ptr for non-nullable argument
0d859cc [Wes McKinney] Code review comments, scrubbing some flakes
1368415 [Wes McKinney] Fix more MSVC warnings
eccb84c [Wes McKinney] Give macro more accurate name
0eaada0 [Wes McKinney] Use int64_t instead of int for batch sizes
79c3709 [Wes McKinney] Add documentation. Remove RecordReader from public API
8fa619b [Wes McKinney] Initialize memory in DecodeSpaced to avoid undefined behavior
5a0c860 [Wes McKinney] Remove non-repeated branch from DelimitRecords
c754e6e [Wes McKinney] Refactor to skip record delimiting for non-repeated data
ed2a03f [Wes McKinney] Move more code into TypedRecordReader
2e934e9 [Wes McKinney] Set some integers as const
58d3a0f [Wes McKinney] Do not index into levels arrays
b766371 [Wes McKinney] Add RecordReader::Reserve to preallocate, fixing perf regression. cpplint
1bf3e8f [Wes McKinney] Refactor to create stateful parquet::RecordReader class to better support nested data. Shift value buffering logic from parquet/arrow/reader into RecordReader. Fix bug described in PARQUET-1100
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/4b09ac70
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/4b09ac70
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/4b09ac70
Branch: refs/heads/master
Commit: 4b09ac703bc75fee72f94bed8ecfe571096b04c1
Parents: 18ca392
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Tue Sep 19 21:38:52 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Tue Sep 19 21:38:52 2017 -0400
----------------------------------------------------------------------
CMakeLists.txt | 1 +
.../arrow/arrow-reader-writer-benchmark.cc | 36 +-
src/parquet/arrow/arrow-reader-writer-test.cc | 208 +++-
src/parquet/arrow/reader.cc | 1048 ++++++------------
src/parquet/arrow/reader.h | 8 +-
src/parquet/arrow/record_reader.cc | 807 ++++++++++++++
src/parquet/arrow/record_reader.h | 113 ++
src/parquet/arrow/schema.cc | 140 ++-
src/parquet/arrow/schema.h | 17 +-
src/parquet/column_reader.cc | 5 +-
src/parquet/column_reader.h | 175 +--
src/parquet/column_writer-test.cc | 2 +-
src/parquet/encoding-internal.h | 4 +-
src/parquet/encoding.h | 4 +
src/parquet/file/metadata.h | 8 +-
src/parquet/file/printer.cc | 2 +-
src/parquet/file/reader-internal.cc | 2 +-
src/parquet/file/reader.cc | 7 +
src/parquet/file/reader.h | 2 +
src/parquet/schema-test.cc | 22 +-
src/parquet/schema.cc | 12 +-
src/parquet/schema.h | 9 +-
src/parquet/statistics-test.cc | 5 +-
src/parquet/test-util.h | 2 +-
src/parquet/util/memory-test.cc | 4 +-
src/parquet/util/memory.cc | 12 +-
src/parquet/util/memory.h | 2 +-
src/parquet/util/schema-util.h | 5 +-
28 files changed, 1701 insertions(+), 961 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 1deab40..ca37b5f 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -628,6 +628,7 @@ set(LIBPARQUET_SRCS
src/parquet/types.cc
src/parquet/arrow/reader.cc
+ src/parquet/arrow/record_reader.cc
src/parquet/arrow/schema.cc
src/parquet/arrow/writer.cc
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/arrow-reader-writer-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index e899e10..a54fb5d 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -17,6 +17,8 @@
#include "benchmark/benchmark.h"
+#include <iostream>
+
#include "parquet/arrow/reader.h"
#include "parquet/arrow/writer.h"
#include "parquet/column_reader.h"
@@ -30,13 +32,14 @@
using arrow::BooleanBuilder;
using arrow::NumericBuilder;
-#define ABORT_NOT_OK(s) \
- do { \
- ::arrow::Status _s = (s); \
- if (ARROW_PREDICT_FALSE(!_s.ok())) { \
- exit(-1); \
- } \
- } while (0);
+#define EXIT_NOT_OK(s) \
+ do { \
+ ::arrow::Status _s = (s); \
+ if (ARROW_PREDICT_FALSE(!_s.ok())) { \
+ std::cout << "Exiting: " << _s.ToString() << std::endl; \
+ exit(EXIT_FAILURE); \
+ } \
+ } while (0)
namespace parquet {
@@ -101,12 +104,12 @@ std::shared_ptr<::arrow::Table> TableFromVector(
std::vector<uint8_t> valid_bytes(BENCHMARK_SIZE, 0);
int n = {0};
std::generate(valid_bytes.begin(), valid_bytes.end(), [&n] { return n++ % 2; });
- ABORT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
+ EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), valid_bytes.data()));
} else {
- ABORT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
+ EXIT_NOT_OK(builder.Append(vec.data(), vec.size(), nullptr));
}
std::shared_ptr<::arrow::Array> array;
- ABORT_NOT_OK(builder.Finish(&array));
+ EXIT_NOT_OK(builder.Finish(&array));
auto field = ::arrow::field("column", type, nullable);
auto schema = std::make_shared<::arrow::Schema>(
@@ -125,12 +128,12 @@ std::shared_ptr<::arrow::Table> TableFromVector<BooleanType>(const std::vector<b
int n = {0};
std::generate(valid_bytes.begin(), valid_bytes.end(),
[&n] { return (n++ % 2) != 0; });
- ABORT_NOT_OK(builder.Append(vec, valid_bytes));
+ EXIT_NOT_OK(builder.Append(vec, valid_bytes));
} else {
- ABORT_NOT_OK(builder.Append(vec));
+ EXIT_NOT_OK(builder.Append(vec));
}
std::shared_ptr<::arrow::Array> array;
- ABORT_NOT_OK(builder.Finish(&array));
+ EXIT_NOT_OK(builder.Finish(&array));
auto field = ::arrow::field("column", ::arrow::boolean(), nullable);
auto schema = std::make_shared<::arrow::Schema>(
@@ -148,7 +151,7 @@ static void BM_WriteColumn(::benchmark::State& state) {
while (state.KeepRunning()) {
auto output = std::make_shared<InMemoryOutputStream>();
- ABORT_NOT_OK(
+ EXIT_NOT_OK(
WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
}
SetBytesProcessed<nullable, ParquetType>(state);
@@ -171,8 +174,7 @@ static void BM_ReadColumn(::benchmark::State& state) {
std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128);
std::shared_ptr<::arrow::Table> table = TableFromVector<ParquetType>(values, nullable);
auto output = std::make_shared<InMemoryOutputStream>();
- ABORT_NOT_OK(
- WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
+ EXIT_NOT_OK(WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE));
std::shared_ptr<Buffer> buffer = output->GetBuffer();
while (state.KeepRunning()) {
@@ -180,7 +182,7 @@ static void BM_ReadColumn(::benchmark::State& state) {
ParquetFileReader::Open(std::make_shared<::arrow::io::BufferReader>(buffer));
FileReader filereader(::arrow::default_memory_pool(), std::move(reader));
std::shared_ptr<::arrow::Table> table;
- ABORT_NOT_OK(filereader.ReadTable(&table));
+ EXIT_NOT_OK(filereader.ReadTable(&table));
}
SetBytesProcessed<nullable, ParquetType>(state);
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index a1e3382..56b4770 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -53,6 +53,9 @@ using arrow::TimeUnit;
using arrow::default_memory_pool;
using arrow::io::BufferReader;
+using arrow::test::randint;
+using arrow::test::random_is_valid;
+
using ArrowId = ::arrow::Type;
using ParquetType = parquet::Type;
using parquet::schema::GroupNode;
@@ -366,6 +369,45 @@ static std::shared_ptr<GroupNode> MakeSimpleSchema(const ::arrow::DataType& type
return std::static_pointer_cast<GroupNode>(node_);
}
+void AssertArraysEqual(const Array& expected, const Array& actual) {
+ if (!actual.Equals(expected)) {
+ std::stringstream pp_result;
+ std::stringstream pp_expected;
+
+ EXPECT_OK(::arrow::PrettyPrint(actual, 0, &pp_result));
+ EXPECT_OK(::arrow::PrettyPrint(expected, 0, &pp_expected));
+ FAIL() << "Got: \n" << pp_result.str() << "\nExpected: \n" << pp_expected.str();
+ }
+}
+
+void AssertChunkedEqual(const ChunkedArray& expected, const ChunkedArray& actual) {
+ ASSERT_EQ(expected.num_chunks(), actual.num_chunks()) << "# chunks unequal";
+ if (!actual.Equals(expected)) {
+ std::stringstream pp_result;
+ std::stringstream pp_expected;
+
+ for (int i = 0; i < actual.num_chunks(); ++i) {
+ auto c1 = actual.chunk(i);
+ auto c2 = expected.chunk(i);
+ if (!c1->Equals(*c2)) {
+ EXPECT_OK(::arrow::PrettyPrint(*c1, 0, &pp_result));
+ EXPECT_OK(::arrow::PrettyPrint(*c2, 0, &pp_expected));
+ FAIL() << "Chunk " << i << " Got: " << pp_result.str()
+ << "\nExpected: " << pp_expected.str();
+ }
+ }
+ }
+}
+
+void AssertTablesEqual(const Table& expected, const Table& actual) {
+ ASSERT_EQ(expected.num_columns(), actual.num_columns());
+
+ for (int i = 0; i < actual.num_columns(); ++i) {
+ AssertChunkedEqual(*expected.column(i)->data(), *actual.column(i)->data());
+ }
+ ASSERT_TRUE(actual.Equals(expected));
+}
+
template <typename TestType>
class TestParquetIO : public ::testing::Test {
public:
@@ -394,13 +436,14 @@ class TestParquetIO : public ::testing::Test {
ASSERT_NE(nullptr, out->get());
}
- void ReadAndCheckSingleColumnFile(::arrow::Array* values) {
- std::shared_ptr<::arrow::Array> out;
+ void ReadAndCheckSingleColumnFile(const Array& values) {
+ std::shared_ptr<Array> out;
std::unique_ptr<FileReader> reader;
ReaderFromSink(&reader);
ReadSingleColumnFile(std::move(reader), &out);
- ASSERT_TRUE(values->Equals(out));
+
+ AssertArraysEqual(values, *out);
}
void ReadTableFromFile(std::unique_ptr<FileReader> reader,
@@ -440,7 +483,7 @@ class TestParquetIO : public ::testing::Test {
*out = MakeSimpleTable(parent_lists, nullable_parent_lists);
}
- void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+ void ReadAndCheckSingleColumnTable(const std::shared_ptr<Array>& values) {
std::shared_ptr<::arrow::Table> out;
std::unique_ptr<FileReader> reader;
ReaderFromSink(&reader);
@@ -452,13 +495,14 @@ class TestParquetIO : public ::testing::Test {
ASSERT_EQ(1, chunked_array->num_chunks());
auto result = chunked_array->chunk(0);
- ASSERT_TRUE(values->Equals(result));
+ AssertArraysEqual(*values, *result);
}
void CheckRoundTrip(const std::shared_ptr<Table>& table) {
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
- ASSERT_TRUE(table->Equals(*result));
+
+ AssertTablesEqual(*table, *result);
}
template <typename ArrayType>
@@ -495,7 +539,7 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredWrite) {
MakeSimpleSchema(*values->type(), Repetition::REQUIRED);
this->WriteColumn(schema, values);
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
@@ -515,7 +559,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+ AssertArraysEqual(*values, *chunked_array->chunk(0));
}
TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
@@ -528,7 +573,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
MakeSimpleSchema(*values->type(), Repetition::OPTIONAL);
this->WriteColumn(schema, values);
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) {
@@ -547,7 +592,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalDictionaryWrite) {
MakeSimpleSchema(*dict_values->type(), Repetition::OPTIONAL);
this->WriteColumn(schema, dict_values);
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
@@ -558,12 +603,12 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
- this->ReadAndCheckSingleColumnFile(sliced_values.get());
+ this->ReadAndCheckSingleColumnFile(*sliced_values);
// Slice offset 1 higher
sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
- this->ReadAndCheckSingleColumnFile(sliced_values.get());
+ this->ReadAndCheckSingleColumnFile(*sliced_values);
}
TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
@@ -574,12 +619,12 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
- this->ReadAndCheckSingleColumnFile(sliced_values.get());
+ this->ReadAndCheckSingleColumnFile(*sliced_values);
// Slice offset 1 higher, thus different null bitmap.
sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
this->WriteColumn(schema, sliced_values);
- this->ReadAndCheckSingleColumnFile(sliced_values.get());
+ this->ReadAndCheckSingleColumnFile(*sliced_values);
}
TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
@@ -636,7 +681,7 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
}
ASSERT_OK_NO_THROW(writer.Close());
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
@@ -679,7 +724,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO) {
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+ AssertArraysEqual(*values, *chunked_array->chunk(0));
}
TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
@@ -698,7 +744,7 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
}
ASSERT_OK_NO_THROW(writer.Close());
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
@@ -763,7 +809,7 @@ TEST_F(TestInt96ParquetIO, ReadIntoTimestamp) {
ASSERT_OK(builder.Append(val));
std::shared_ptr<Array> values;
ASSERT_OK(builder.Finish(&values));
- this->ReadAndCheckSingleColumnFile(values.get());
+ this->ReadAndCheckSingleColumnFile(*values);
}
using TestUInt32ParquetIO = TestParquetIO<::arrow::UInt32Type>;
@@ -850,7 +896,8 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+ AssertArraysEqual(*values, *chunked_array->chunk(0));
}
using TestNullParquetIO = TestParquetIO<::arrow::NullType>;
@@ -871,7 +918,8 @@ TEST_F(TestNullParquetIO, NullColumn) {
std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
ASSERT_EQ(1, chunked_array->num_chunks());
- ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+
+ AssertArraysEqual(*values, *chunked_array->chunk(0));
}
template <typename T>
@@ -1026,14 +1074,14 @@ TEST(TestArrowReadWrite, DateTimeTypes) {
table, 1, table->num_rows(), {}, &result,
ArrowWriterProperties::Builder().enable_deprecated_int96_timestamps()->build());
- ASSERT_TRUE(table->Equals(*result));
+ AssertTablesEqual(*table, *result);
// Cast nanaoseconds to microseconds and use INT64 physical type
DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
std::shared_ptr<Table> expected;
MakeDateTimeTypesTable(&table, true);
- ASSERT_TRUE(table->Equals(*result));
+ AssertTablesEqual(*table, *result);
}
TEST(TestArrowReadWrite, CoerceTimestamps) {
@@ -1097,13 +1145,14 @@ TEST(TestArrowReadWrite, CoerceTimestamps) {
DoSimpleRoundtrip(
input, 1, input->num_rows(), {}, &milli_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MILLI)->build());
- ASSERT_TRUE(milli_result->Equals(*ex_milli_result));
+
+ AssertTablesEqual(*ex_milli_result, *milli_result);
std::shared_ptr<Table> micro_result;
DoSimpleRoundtrip(
input, 1, input->num_rows(), {}, µ_result,
ArrowWriterProperties::Builder().coerce_timestamps(TimeUnit::MICRO)->build());
- ASSERT_TRUE(micro_result->Equals(*ex_micro_result));
+ AssertTablesEqual(*ex_micro_result, *micro_result);
}
TEST(TestArrowReadWrite, CoerceTimestampsLosePrecision) {
@@ -1213,7 +1262,7 @@ TEST(TestArrowReadWrite, ConvertedDateTimeTypes) {
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
- ASSERT_TRUE(result->Equals(*ex_table));
+ AssertTablesEqual(*ex_table, *result);
}
void MakeDoubleTable(int num_columns, int num_rows, int nchunks,
@@ -1253,7 +1302,7 @@ TEST(TestArrowReadWrite, MultithreadedRead) {
std::shared_ptr<Table> result;
DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result);
- ASSERT_TRUE(table->Equals(*result));
+ AssertTablesEqual(*table, *result);
}
TEST(TestArrowReadWrite, ReadSingleRowGroup) {
@@ -1328,7 +1377,92 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
auto ex_schema = std::make_shared<::arrow::Schema>(ex_fields);
Table expected(ex_schema, ex_columns);
- ASSERT_TRUE(result->Equals(expected));
+ AssertTablesEqual(expected, *result);
+}
+
+void MakeListTable(int num_rows, std::shared_ptr<Table>* out) {
+ ::arrow::Int32Builder offset_builder;
+
+ std::vector<int32_t> length_draws;
+ randint(num_rows, 0, 100, &length_draws);
+
+ std::vector<int32_t> offset_values;
+
+ // Make sure some of them are length 0
+ int32_t total_elements = 0;
+ for (size_t i = 0; i < length_draws.size(); ++i) {
+ if (length_draws[i] < 10) {
+ length_draws[i] = 0;
+ }
+ offset_values.push_back(total_elements);
+ total_elements += length_draws[i];
+ }
+ offset_values.push_back(total_elements);
+
+ std::vector<int8_t> value_draws;
+ randint<int8_t>(total_elements, 0, 100, &value_draws);
+
+ std::vector<bool> is_valid;
+ random_is_valid(total_elements, 0.1, &is_valid);
+
+ std::shared_ptr<Array> values, offsets;
+ ::arrow::ArrayFromVector<::arrow::Int8Type, int8_t>(::arrow::int8(), is_valid,
+ value_draws, &values);
+ ::arrow::ArrayFromVector<::arrow::Int32Type, int32_t>(offset_values, &offsets);
+
+ std::shared_ptr<Array> list_array;
+ ASSERT_OK(::arrow::ListArray::FromArrays(*offsets, *values, default_memory_pool(),
+ &list_array));
+
+ auto f1 = ::arrow::field("a", ::arrow::list(::arrow::int8()));
+ auto schema = ::arrow::schema({f1});
+ std::vector<std::shared_ptr<Array>> arrays = {list_array};
+ *out = std::make_shared<Table>(schema, arrays);
+}
+
+TEST(TestArrowReadWrite, ListLargeRecords) {
+ const int num_rows = 50;
+
+ std::shared_ptr<Table> table;
+ MakeListTable(num_rows, &table);
+
+ std::shared_ptr<Buffer> buffer;
+ WriteTableToBuffer(table, 1, 100, default_arrow_writer_properties(), &buffer);
+
+ std::unique_ptr<FileReader> reader;
+ ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+ ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
+
+ // Read everything
+ std::shared_ptr<Table> result;
+ ASSERT_OK_NO_THROW(reader->ReadTable(&result));
+ AssertTablesEqual(*table, *result);
+
+ ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
+ ::arrow::default_memory_pool(),
+ ::parquet::default_reader_properties(), nullptr, &reader));
+
+ std::unique_ptr<ColumnReader> col_reader;
+ ASSERT_OK(reader->GetColumn(0, &col_reader));
+
+ auto expected = table->column(0)->data()->chunk(0);
+
+ std::vector<std::shared_ptr<Array>> pieces;
+ for (int i = 0; i < num_rows; ++i) {
+ std::shared_ptr<Array> piece;
+ ASSERT_OK(col_reader->NextBatch(1, &piece));
+ ASSERT_EQ(1, piece->length());
+ pieces.push_back(piece);
+ }
+ auto chunked = std::make_shared<::arrow::ChunkedArray>(pieces);
+
+ auto chunked_col =
+ std::make_shared<::arrow::Column>(table->schema()->field(0), chunked);
+ std::vector<std::shared_ptr<::arrow::Column>> columns = {chunked_col};
+ auto chunked_table = std::make_shared<Table>(table->schema(), columns);
+
+ ASSERT_TRUE(table->Equals(*chunked_table));
}
TEST(TestArrowWrite, CheckChunkSize) {
@@ -1359,6 +1493,7 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
void InitNewParquetFile(const std::shared_ptr<GroupNode>& schema, int num_rows) {
nested_parquet_ = std::make_shared<InMemoryOutputStream>();
+
writer_ = parquet::ParquetFileWriter::Open(nested_parquet_, schema,
default_writer_properties());
row_group_writer_ = writer_->AppendRowGroup(num_rows);
@@ -1397,7 +1532,6 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
void ValidateColumnArray(const ::arrow::Int32Array& array, size_t expected_nulls) {
ValidateArray(array, expected_nulls);
-
int j = 0;
for (int i = 0; i < values_array_->length(); i++) {
if (array.IsNull(i)) {
@@ -1515,15 +1649,19 @@ class TestNestedSchemaRead : public ::testing::TestWithParam<Repetition::type> {
int num_columns = num_trees * static_cast<int>((std::pow(num_children, tree_depth)));
- std::vector<int16_t> def_levels(num_rows);
- std::vector<int16_t> rep_levels(num_rows);
- for (int i = 0; i < num_rows; i++) {
+ std::vector<int16_t> def_levels;
+ std::vector<int16_t> rep_levels;
+
+ int num_levels = 0;
+ while (num_levels < num_rows) {
if (node_repetition == Repetition::REQUIRED) {
- def_levels[i] = 0; // all is required
+ def_levels.push_back(0); // all are required
} else {
- def_levels[i] = i % tree_depth; // all is optional
+ int16_t level = static_cast<int16_t>(num_levels % (tree_depth + 2));
+ def_levels.push_back(level); // all are optional
}
- rep_levels[i] = 0; // none is repeated
+ rep_levels.push_back(0); // none is repeated
+ ++num_levels;
}
// Produce values for the columns
@@ -1675,7 +1813,7 @@ TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) {
const int num_trees = 10;
const int depth = 5;
const int num_children = 3;
- int num_rows = SMALL_SIZE * depth;
+ int num_rows = SMALL_SIZE * (depth + 2);
CreateMultiLevelNestedParquet(num_trees, depth, num_children, num_rows, GetParam());
std::shared_ptr<Table> table;
ASSERT_OK_NO_THROW(reader_->ReadTable(&table));
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 8d5ea7e..5edc837 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -24,13 +24,18 @@
#include <queue>
#include <string>
#include <thread>
+#include <type_traits>
#include <vector>
#include "arrow/api.h"
#include "arrow/util/bit-util.h"
#include "arrow/util/logging.h"
+#include "arrow/util/parallel.h"
+#include "parquet/arrow/record_reader.h"
#include "parquet/arrow/schema.h"
+#include "parquet/column_reader.h"
+#include "parquet/schema.h"
#include "parquet/util/schema-util.h"
using arrow::Array;
@@ -40,21 +45,28 @@ using arrow::Field;
using arrow::Int32Array;
using arrow::ListArray;
using arrow::StructArray;
+using arrow::TimestampArray;
using arrow::MemoryPool;
using arrow::PoolBuffer;
using arrow::Status;
using arrow::Table;
-using parquet::schema::NodePtr;
+using parquet::schema::Node;
// Help reduce verbosity
using ParquetReader = parquet::ParquetFileReader;
+using arrow::ParallelFor;
+
+using parquet::internal::RecordReader;
namespace parquet {
namespace arrow {
+using ::arrow::BitUtil::BytesForBits;
+
constexpr int64_t kJulianToUnixEpochDays = 2440588LL;
-constexpr int64_t kNanosecondsInADay = 86400LL * 1000LL * 1000LL * 1000LL;
+constexpr int64_t kMillisecondsInADay = 86400000LL;
+constexpr int64_t kNanosecondsInADay = kMillisecondsInADay * 1000LL * 1000LL;
static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timestamp) {
int64_t days_since_epoch = impala_timestamp.value[2] - kJulianToUnixEpochDays;
@@ -66,47 +78,6 @@ template <typename ArrowType>
using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
// ----------------------------------------------------------------------
-// Helper for parallel for-loop
-
-template <class FUNCTION>
-Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
- std::vector<std::thread> thread_pool;
- thread_pool.reserve(nthreads);
- std::atomic<int> task_counter(0);
-
- std::mutex error_mtx;
- bool error_occurred = false;
- Status error;
-
- for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
- thread_pool.emplace_back(
- [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
- int task_id;
- while (!error_occurred) {
- task_id = task_counter.fetch_add(1);
- if (task_id >= num_tasks) {
- break;
- }
- Status s = func(task_id);
- if (!s.ok()) {
- std::lock_guard<std::mutex> lock(error_mtx);
- error_occurred = true;
- error = s;
- break;
- }
- }
- });
- }
- for (auto&& thread : thread_pool) {
- thread.join();
- }
- if (error_occurred) {
- return error;
- }
- return Status::OK();
-}
-
-// ----------------------------------------------------------------------
// Iteration utilities
// Abstraction to decouple row group iteration details from the ColumnReader,
@@ -120,7 +91,7 @@ class FileColumnIterator {
virtual ~FileColumnIterator() {}
- virtual std::shared_ptr<::parquet::ColumnReader> Next() = 0;
+ virtual std::unique_ptr<::parquet::PageReader> NextChunk() = 0;
const SchemaDescriptor* schema() const { return schema_; }
@@ -141,10 +112,10 @@ class AllRowGroupsIterator : public FileColumnIterator {
explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
: FileColumnIterator(column_index, reader), next_row_group_(0) {}
- std::shared_ptr<::parquet::ColumnReader> Next() override {
- std::shared_ptr<::parquet::ColumnReader> result;
+ std::unique_ptr<::parquet::PageReader> NextChunk() override {
+ std::unique_ptr<::parquet::PageReader> result;
if (next_row_group_ < reader_->metadata()->num_row_groups()) {
- result = reader_->RowGroup(next_row_group_)->Column(column_index_);
+ result = reader_->RowGroup(next_row_group_)->GetColumnPageReader(column_index_);
next_row_group_++;
} else {
result = nullptr;
@@ -164,12 +135,13 @@ class SingleRowGroupIterator : public FileColumnIterator {
row_group_number_(row_group_number),
done_(false) {}
- std::shared_ptr<::parquet::ColumnReader> Next() override {
+ std::unique_ptr<::parquet::PageReader> NextChunk() override {
if (done_) {
return nullptr;
}
- auto result = reader_->RowGroup(row_group_number_)->Column(column_index_);
+ auto result =
+ reader_->RowGroup(row_group_number_)->GetColumnPageReader(column_index_);
done_ = true;
return result;
};
@@ -193,8 +165,9 @@ class FileReader::Impl {
Status ReadSchemaField(int i, std::shared_ptr<Array>* out);
Status ReadSchemaField(int i, const std::vector<int>& indices,
std::shared_ptr<Array>* out);
- Status GetReaderForNode(int index, const NodePtr& node, const std::vector<int>& indices,
- int16_t def_level, std::unique_ptr<ColumnReader::Impl>* out);
+ Status GetReaderForNode(int index, const Node* node, const std::vector<int>& indices,
+ int16_t def_level,
+ std::unique_ptr<ColumnReader::ColumnReaderImpl>* out);
Status ReadColumn(int i, std::shared_ptr<Array>* out);
Status GetSchema(std::shared_ptr<::arrow::Schema>* out);
Status GetSchema(const std::vector<int>& indices,
@@ -223,96 +196,54 @@ class FileReader::Impl {
int num_threads_;
};
-typedef const int16_t* ValueLevelsPtr;
-
-class ColumnReader::Impl {
+class ColumnReader::ColumnReaderImpl {
public:
- virtual ~Impl() {}
- virtual Status NextBatch(int batch_size, std::shared_ptr<Array>* out) = 0;
- virtual Status GetDefLevels(ValueLevelsPtr* data, size_t* length) = 0;
- virtual Status GetRepLevels(ValueLevelsPtr* data, size_t* length) = 0;
+ virtual ~ColumnReaderImpl() {}
+ virtual Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) = 0;
+ virtual Status GetDefLevels(const int16_t** data, size_t* length) = 0;
+ virtual Status GetRepLevels(const int16_t** data, size_t* length) = 0;
virtual const std::shared_ptr<Field> field() = 0;
};
// Reader implementation for primitive arrays
-class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::Impl {
+class PARQUET_NO_EXPORT PrimitiveImpl : public ColumnReader::ColumnReaderImpl {
public:
PrimitiveImpl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
- : pool_(pool),
- input_(std::move(input)),
- descr_(input_->descr()),
- values_buffer_(pool),
- def_levels_buffer_(pool),
- rep_levels_buffer_(pool) {
- DCHECK(NodeToField(input_->descr()->schema_node(), &field_).ok());
+ : pool_(pool), input_(std::move(input)), descr_(input_->descr()) {
+ record_reader_ = RecordReader::Make(descr_, pool_);
+ DCHECK(NodeToField(*input_->descr()->schema_node(), &field_).ok());
NextRowGroup();
}
virtual ~PrimitiveImpl() {}
- Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
-
- template <typename ArrowType, typename ParquetType>
- Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
-
- template <typename ArrowType>
- Status ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out);
+ Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
- template <typename ArrowType>
- Status ReadFLBABatch(int batch_size, int byte_width, std::shared_ptr<Array>* out);
+ template <typename ParquetType>
+ Status WrapIntoListArray(std::shared_ptr<Array>* array);
- template <typename ArrowType>
- Status InitDataBuffer(int batch_size);
- Status InitValidBits(int batch_size);
- template <typename ArrowType, typename ParquetType>
- Status ReadNullableBatch(TypedColumnReader<ParquetType>* reader, int16_t* def_levels,
- int16_t* rep_levels, int64_t values_to_read,
- int64_t* levels_read, int64_t* values_read);
- template <typename ArrowType, typename ParquetType>
- Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
- int64_t values_to_read, int64_t* levels_read);
- Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels,
- int64_t total_values_read, std::shared_ptr<Array>* array);
-
- Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
- Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+ Status GetDefLevels(const int16_t** data, size_t* length) override;
+ Status GetRepLevels(const int16_t** data, size_t* length) override;
const std::shared_ptr<Field> field() override { return field_; }
private:
void NextRowGroup();
- template <typename InType, typename OutType>
- struct can_copy_ptr {
- static constexpr bool value =
- std::is_same<InType, OutType>::value ||
- (std::is_integral<InType>{} && std::is_integral<OutType>{} &&
- (sizeof(InType) == sizeof(OutType)));
- };
-
MemoryPool* pool_;
std::unique_ptr<FileColumnIterator> input_;
const ColumnDescriptor* descr_;
- std::shared_ptr<::parquet::ColumnReader> column_reader_;
- std::shared_ptr<Field> field_;
+ std::shared_ptr<RecordReader> record_reader_;
- PoolBuffer values_buffer_;
- PoolBuffer def_levels_buffer_;
- PoolBuffer rep_levels_buffer_;
- std::shared_ptr<PoolBuffer> data_buffer_;
- uint8_t* data_buffer_ptr_;
- std::shared_ptr<PoolBuffer> valid_bits_buffer_;
- uint8_t* valid_bits_ptr_;
- int64_t valid_bits_idx_;
- int64_t null_count_;
+ std::shared_ptr<Field> field_;
};
// Reader implementation for struct array
-class PARQUET_NO_EXPORT StructImpl : public ColumnReader::Impl {
+class PARQUET_NO_EXPORT StructImpl : public ColumnReader::ColumnReaderImpl {
public:
- explicit StructImpl(const std::vector<std::shared_ptr<Impl>>& children,
- int16_t struct_def_level, MemoryPool* pool, const NodePtr& node)
+ explicit StructImpl(const std::vector<std::shared_ptr<ColumnReaderImpl>>& children,
+ int16_t struct_def_level, MemoryPool* pool, const Node* node)
: children_(children),
struct_def_level_(struct_def_level),
pool_(pool),
@@ -322,21 +253,21 @@ class PARQUET_NO_EXPORT StructImpl : public ColumnReader::Impl {
virtual ~StructImpl() {}
- Status NextBatch(int batch_size, std::shared_ptr<Array>* out) override;
- Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override;
- Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override;
+ Status NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) override;
+ Status GetDefLevels(const int16_t** data, size_t* length) override;
+ Status GetRepLevels(const int16_t** data, size_t* length) override;
const std::shared_ptr<Field> field() override { return field_; }
private:
- std::vector<std::shared_ptr<Impl>> children_;
+ std::vector<std::shared_ptr<ColumnReaderImpl>> children_;
int16_t struct_def_level_;
MemoryPool* pool_;
std::shared_ptr<Field> field_;
PoolBuffer def_levels_buffer_;
- Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap,
- int64_t* null_count);
- void InitField(const NodePtr& node, const std::vector<std::shared_ptr<Impl>>& children);
+ Status DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap, int64_t* null_count);
+ void InitField(const Node* node,
+ const std::vector<std::shared_ptr<ColumnReaderImpl>>& children);
};
FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
@@ -347,26 +278,26 @@ FileReader::~FileReader() {}
Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
- std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+ new PrimitiveImpl(pool_, std::move(input)));
*out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
return Status::OK();
}
-Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
- const std::vector<int>& indices,
- int16_t def_level,
- std::unique_ptr<ColumnReader::Impl>* out) {
+Status FileReader::Impl::GetReaderForNode(
+ int index, const Node* node, const std::vector<int>& indices, int16_t def_level,
+ std::unique_ptr<ColumnReader::ColumnReaderImpl>* out) {
*out = nullptr;
if (IsSimpleStruct(node)) {
- const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node.get());
- std::vector<std::shared_ptr<ColumnReader::Impl>> children;
+ const schema::GroupNode* group = static_cast<const schema::GroupNode*>(node);
+ std::vector<std::shared_ptr<ColumnReader::ColumnReaderImpl>> children;
for (int i = 0; i < group->field_count(); i++) {
- std::unique_ptr<ColumnReader::Impl> child_reader;
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> child_reader;
// TODO(itaiin): Remove the -1 index hack when all types of nested reads
// are supported. This currently just signals the lower level reader resolution
// to abort
- RETURN_NOT_OK(GetReaderForNode(index, group->field(i), indices, def_level + 1,
+ RETURN_NOT_OK(GetReaderForNode(index, group->field(i).get(), indices, def_level + 1,
&child_reader));
if (child_reader != nullptr) {
children.push_back(std::move(child_reader));
@@ -374,22 +305,22 @@ Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node,
}
if (children.size() > 0) {
- *out = std::unique_ptr<ColumnReader::Impl>(
+ *out = std::unique_ptr<ColumnReader::ColumnReaderImpl>(
new StructImpl(children, def_level, pool_, node));
}
} else {
// This should be a flat field case - translate the field index to
// the correct column index by walking down to the leaf node
- NodePtr walker = node;
+ const Node* walker = node;
while (!walker->is_primitive()) {
DCHECK(walker->is_group());
- auto group = static_cast<GroupNode*>(walker.get());
+ auto group = static_cast<const GroupNode*>(walker);
if (group->field_count() != 1) {
return Status::NotImplemented("lists with structs are not supported.");
}
- walker = group->field(0);
+ walker = group->field(0).get();
}
- auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker.get());
+ auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker);
// If the index of the column is found then a reader for the coliumn is needed.
// Otherwise *out keeps the nullptr value.
@@ -417,8 +348,8 @@ Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
std::shared_ptr<Array>* out) {
auto parquet_schema = reader_->metadata()->schema();
- auto node = parquet_schema->group_node()->field(i);
- std::unique_ptr<ColumnReader::Impl> reader_impl;
+ auto node = parquet_schema->group_node()->field(i).get();
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl));
if (reader_impl == nullptr) {
@@ -428,24 +359,28 @@ Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
std::unique_ptr<ColumnReader> reader(new ColumnReader(std::move(reader_impl)));
- int64_t batch_size = 0;
- for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
- batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+ // TODO(wesm): This calculation doesn't make much sense when we have repeated
+ // schema nodes
+ int64_t records_to_read = 0;
+
+ const FileMetaData& metadata = *reader_->metadata();
+ for (int j = 0; j < metadata.num_row_groups(); j++) {
+ records_to_read += metadata.RowGroup(j)->ColumnChunk(i)->num_values();
}
- return reader->NextBatch(static_cast<int>(batch_size), out);
+ return reader->NextBatch(records_to_read, out);
}
Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
std::unique_ptr<ColumnReader> flat_column_reader;
RETURN_NOT_OK(GetColumn(i, &flat_column_reader));
- int64_t batch_size = 0;
+ int64_t records_to_read = 0;
for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) {
- batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
+ records_to_read += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values();
}
- return flat_column_reader->NextBatch(static_cast<int>(batch_size), out);
+ return flat_column_reader->NextBatch(records_to_read, out);
}
Status FileReader::Impl::GetSchema(const std::vector<int>& indices,
@@ -472,16 +407,17 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index,
auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
this](int i) {
int column_index = indices[i];
- int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values();
+ int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values();
std::unique_ptr<FileColumnIterator> input(
new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
- std::unique_ptr<ColumnReader::Impl> impl(new PrimitiveImpl(pool_, std::move(input)));
+ std::unique_ptr<ColumnReader::ColumnReaderImpl> impl(
+ new PrimitiveImpl(pool_, std::move(input)));
ColumnReader flat_column_reader(std::move(impl));
std::shared_ptr<Array> array;
- RETURN_NOT_OK(flat_column_reader.NextBatch(static_cast<int>(batch_size), &array));
+ RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array));
columns[i] = std::make_shared<Column>(schema->field(i), array);
return Status::OK();
};
@@ -642,282 +578,12 @@ const ParquetFileReader* FileReader::parquet_reader() const {
return impl_->parquet_reader();
}
-template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
- int64_t values_to_read, int64_t* levels_read) {
- using ArrowCType = typename ArrowType::c_type;
- using ParquetCType = typename ParquetType::c_type;
-
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
- auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
- int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read =
- reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
- nullptr, values, &values_read));
-
- ArrowCType* out_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
- std::copy(values, values + values_read, out_ptr + valid_bits_idx_);
- valid_bits_idx_ += values_read;
-
- return Status::OK();
-}
-
-#define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \
- template <> \
- Status PrimitiveImpl::ReadNonNullableBatch<ArrowType, ParquetType>( \
- TypedColumnReader<ParquetType> * reader, int64_t values_to_read, \
- int64_t * levels_read) { \
- int64_t values_read; \
- CType* out_ptr = reinterpret_cast<CType*>(data_buffer_ptr_); \
- PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch( \
- static_cast<int>(values_to_read), nullptr, nullptr, \
- out_ptr + valid_bits_idx_, &values_read)); \
- \
- valid_bits_idx_ += values_read; \
- \
- return Status::OK(); \
- }
-
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
-NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
- TypedColumnReader<Int96Type>* reader, int64_t values_to_read, int64_t* levels_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
- auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
- int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read =
- reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
- nullptr, values, &values_read));
-
- int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
- for (int64_t i = 0; i < values_read; i++) {
- *out_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
- }
- valid_bits_idx_ += values_read;
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>(
- TypedColumnReader<Int32Type>* reader, int64_t values_to_read, int64_t* levels_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
- auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
- int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read =
- reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
- nullptr, values, &values_read));
-
- int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_) + valid_bits_idx_;
- for (int64_t i = 0; i < values_read; i++) {
- *out_ptr++ = static_cast<int64_t>(values[i]) * 86400000;
- }
- valid_bits_idx_ += values_read;
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
- TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
- int64_t* levels_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
- auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
- int64_t values_read;
- PARQUET_CATCH_NOT_OK(*levels_read =
- reader->ReadBatch(static_cast<int>(values_to_read), nullptr,
- nullptr, values, &values_read));
-
- for (int64_t i = 0; i < values_read; i++) {
- if (values[i]) {
- ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_);
- }
- valid_bits_idx_++;
- }
-
- return Status::OK();
-}
-
-template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::ReadNullableBatch(TypedColumnReader<ParquetType>* reader,
- int16_t* def_levels, int16_t* rep_levels,
- int64_t values_to_read, int64_t* levels_read,
- int64_t* values_read) {
- using ArrowCType = typename ArrowType::c_type;
- using ParquetCType = typename ParquetType::c_type;
-
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
- auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
- int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
- static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
- valid_bits_idx_, levels_read, values_read, &null_count));
-
- auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
- INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
-
- for (int64_t i = 0; i < *values_read; i++) {
- if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
- data_ptr[valid_bits_idx_ + i] = values[i];
- }
- READ_NEXT_BITSET(valid_bits_ptr_);
- }
- null_count_ += null_count;
- valid_bits_idx_ += *values_read;
-
- return Status::OK();
-}
-
-#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \
- template <> \
- Status PrimitiveImpl::ReadNullableBatch<ArrowType, ParquetType>( \
- TypedColumnReader<ParquetType> * reader, int16_t * def_levels, \
- int16_t * rep_levels, int64_t values_to_read, int64_t * levels_read, \
- int64_t * values_read) { \
- auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_); \
- int64_t null_count; \
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced( \
- static_cast<int>(values_to_read), def_levels, rep_levels, \
- data_ptr + valid_bits_idx_, valid_bits_ptr_, valid_bits_idx_, levels_read, \
- values_read, &null_count)); \
- \
- valid_bits_idx_ += *values_read; \
- null_count_ += null_count; \
- \
- return Status::OK(); \
- }
-
-NULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
-NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
-NULLABLE_BATCH_FAST_PATH(::arrow::Date32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::TimestampType, Int64Type, int64_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t)
-NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t)
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::TimestampType, Int96Type>(
- TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int16_t* rep_levels,
- int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false));
- auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
- int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
- static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
- valid_bits_idx_, levels_read, values_read, &null_count));
-
- auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
- INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
- for (int64_t i = 0; i < *values_read; i++) {
- if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
- data_ptr[valid_bits_idx_ + i] = impala_timestamp_to_nanoseconds(values[i]);
- }
- READ_NEXT_BITSET(valid_bits_ptr_);
- }
- null_count_ += null_count;
- valid_bits_idx_ += *values_read;
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::Date64Type, Int32Type>(
- TypedColumnReader<Int32Type>* reader, int16_t* def_levels, int16_t* rep_levels,
- int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false));
- auto values = reinterpret_cast<int32_t*>(values_buffer_.mutable_data());
- int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
- static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
- valid_bits_idx_, levels_read, values_read, &null_count));
-
- auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
- INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
- for (int64_t i = 0; i < *values_read; i++) {
- if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
- data_ptr[valid_bits_idx_ + i] = static_cast<int64_t>(values[i]) * 86400000;
- }
- READ_NEXT_BITSET(valid_bits_ptr_);
- }
- null_count_ += null_count;
- valid_bits_idx_ += *values_read;
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::ReadNullableBatch<::arrow::BooleanType, BooleanType>(
- TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int16_t* rep_levels,
- int64_t values_to_read, int64_t* levels_read, int64_t* values_read) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
- auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
- int64_t null_count;
- PARQUET_CATCH_NOT_OK(reader->ReadBatchSpaced(
- static_cast<int>(values_to_read), def_levels, rep_levels, values, valid_bits_ptr_,
- valid_bits_idx_, levels_read, values_read, &null_count));
-
- INIT_BITSET(valid_bits_ptr_, static_cast<int>(valid_bits_idx_));
- for (int64_t i = 0; i < *values_read; i++) {
- if (bitset_valid_bits_ptr_ & (1 << bit_offset_valid_bits_ptr_)) {
- if (values[i]) {
- ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_ + i);
- }
- }
- READ_NEXT_BITSET(valid_bits_ptr_);
- }
- valid_bits_idx_ += *values_read;
- null_count_ += null_count;
-
- return Status::OK();
-}
-
-template <typename ArrowType>
-Status PrimitiveImpl::InitDataBuffer(int batch_size) {
- using ArrowCType = typename ArrowType::c_type;
- data_buffer_ = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType), false));
- data_buffer_ptr_ = data_buffer_->mutable_data();
-
- return Status::OK();
-}
-
-template <>
-Status PrimitiveImpl::InitDataBuffer<::arrow::BooleanType>(int batch_size) {
- data_buffer_ = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8, false));
- data_buffer_ptr_ = data_buffer_->mutable_data();
- memset(data_buffer_ptr_, 0, data_buffer_->size());
-
- return Status::OK();
-}
-
-Status PrimitiveImpl::InitValidBits(int batch_size) {
- valid_bits_idx_ = 0;
- if (descr_->max_definition_level() > 0) {
- int valid_bits_size =
- static_cast<int>(::arrow::BitUtil::CeilByte(batch_size + 1)) / 8;
- valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(valid_bits_buffer_->Resize(valid_bits_size, false));
- valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
- memset(valid_bits_ptr_, 0, valid_bits_size);
- null_count_ = 0;
- }
- return Status::OK();
-}
+template <typename ParquetType>
+Status PrimitiveImpl::WrapIntoListArray(std::shared_ptr<Array>* array) {
+ const int16_t* def_levels = record_reader_->def_levels();
+ const int16_t* rep_levels = record_reader_->rep_levels();
+ const int64_t total_levels_read = record_reader_->levels_position();
-Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
- const int16_t* rep_levels,
- int64_t total_levels_read,
- std::shared_ptr<Array>* array) {
std::shared_ptr<::arrow::Schema> arrow_schema;
RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()},
input_->metadata()->key_value_metadata(),
@@ -1021,8 +687,8 @@ Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
std::shared_ptr<Array> output(*array);
for (int64_t j = list_depth - 1; j >= 0; j--) {
- auto list_type = std::make_shared<::arrow::ListType>(
- std::make_shared<Field>("item", output->type(), nullable[j + 1]));
+ auto list_type =
+ ::arrow::list(::arrow::field("item", output->type(), nullable[j + 1]));
output = std::make_shared<::arrow::ListArray>(
list_type, list_lengths[j], offsets[j], output, valid_bits[j], null_counts[j]);
}
@@ -1032,346 +698,291 @@ Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels,
}
template <typename ArrowType, typename ParquetType>
-Status PrimitiveImpl::TypedReadBatch(int batch_size, std::shared_ptr<Array>* out) {
+struct supports_fast_path_impl {
using ArrowCType = typename ArrowType::c_type;
+ using ParquetCType = typename ParquetType::c_type;
+ static constexpr bool value = std::is_same<ArrowCType, ParquetCType>::value;
+};
- int values_to_read = batch_size;
- int total_levels_read = 0;
- RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size));
- RETURN_NOT_OK(InitValidBits(batch_size));
- if (descr_->max_definition_level() > 0) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- if (descr_->max_repetition_level() > 0) {
- RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
- while ((values_to_read > 0) && column_reader_) {
- auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
- int64_t values_read;
- int64_t levels_read;
- if (descr_->max_definition_level() == 0) {
- RETURN_NOT_OK((ReadNonNullableBatch<ArrowType, ParquetType>(reader, values_to_read,
- &values_read)));
+template <typename ArrowType>
+struct supports_fast_path_impl<ArrowType, ByteArrayType> {
+ static constexpr bool value = false;
+};
+
+template <typename ArrowType>
+struct supports_fast_path_impl<ArrowType, FLBAType> {
+ static constexpr bool value = false;
+};
+
+template <typename ArrowType, typename ParquetType>
+using supports_fast_path =
+ typename std::enable_if<supports_fast_path_impl<ArrowType, ParquetType>::value>::type;
+
+template <typename ArrowType, typename ParquetType, typename Enable = void>
+struct TransferFunctor {
+ using ArrowCType = typename ArrowType::c_type;
+ using ParquetCType = typename ParquetType::c_type;
+
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(ArrowCType), &data));
+
+ auto values = reinterpret_cast<const ParquetCType*>(reader->values());
+ auto out_ptr = reinterpret_cast<ArrowCType*>(data->mutable_data());
+ std::copy(values, values + length, out_ptr);
+
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ *out = std::make_shared<ArrayType<ArrowType>>(type, length, data, is_valid,
+ reader->null_count());
} else {
- // As per the defintion and checks for flat (list) columns:
- // descr_->max_definition_level() > 0, <= 3
- RETURN_NOT_OK((ReadNullableBatch<ArrowType, ParquetType>(
- reader, def_levels + total_levels_read, rep_levels + total_levels_read,
- values_to_read, &levels_read, &values_read)));
- total_levels_read += static_cast<int>(levels_read);
- }
- values_to_read -= static_cast<int>(values_read);
- if (!column_reader_->HasNext()) {
- NextRowGroup();
+ *out = std::make_shared<ArrayType<ArrowType>>(type, length, data);
}
+ return Status::OK();
}
+};
- // Shrink arrays as they may be larger than the output.
- RETURN_NOT_OK(data_buffer_->Resize(valid_bits_idx_ * sizeof(ArrowCType)));
- if (descr_->max_definition_level() > 0) {
- if (valid_bits_idx_ < batch_size * 0.8) {
- RETURN_NOT_OK(valid_bits_buffer_->Resize(
- ::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8, false));
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<ArrowType, ParquetType,
+ supports_fast_path<ArrowType, ParquetType>> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ std::shared_ptr<PoolBuffer> values = reader->ReleaseValues();
+
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ *out = std::make_shared<ArrayType<ArrowType>>(type, length, values, is_valid,
+ reader->null_count());
+ } else {
+ *out = std::make_shared<ArrayType<ArrowType>>(type, length, values);
}
- *out = std::make_shared<ArrayType<ArrowType>>(
- field_->type(), valid_bits_idx_, data_buffer_, valid_bits_buffer_, null_count_);
- // Relase the ownership as the Buffer is now part of a new Array
- valid_bits_buffer_.reset();
- } else {
- *out = std::make_shared<ArrayType<ArrowType>>(field_->type(), valid_bits_idx_,
- data_buffer_);
+ return Status::OK();
}
- // Relase the ownership as the Buffer is now part of a new Array
- data_buffer_.reset();
-
- // Check if we should transform this array into an list array.
- return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+};
template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>(
- int batch_size, std::shared_ptr<Array>* out) {
- int values_to_read = batch_size;
- int total_levels_read = 0;
- RETURN_NOT_OK(InitDataBuffer<::arrow::BooleanType>(batch_size));
- RETURN_NOT_OK(InitValidBits(batch_size));
- if (descr_->max_definition_level() > 0) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- if (descr_->max_repetition_level() > 0) {
- RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
- while ((values_to_read > 0) && column_reader_) {
- auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
- int64_t values_read;
- int64_t levels_read;
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- if (descr_->max_definition_level() == 0) {
- RETURN_NOT_OK((ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
- reader, values_to_read, &values_read)));
- } else {
- // As per the defintion and checks for flat columns:
- // descr_->max_definition_level() == 1
- RETURN_NOT_OK((ReadNullableBatch<::arrow::BooleanType, BooleanType>(
- reader, def_levels + total_levels_read, rep_levels + total_levels_read,
- values_to_read, &levels_read, &values_read)));
- total_levels_read += static_cast<int>(levels_read);
- }
- values_to_read -= static_cast<int>(values_read);
- if (!column_reader_->HasNext()) {
- NextRowGroup();
+struct TransferFunctor<::arrow::BooleanType, BooleanType> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ std::shared_ptr<Buffer> data;
+
+ const int64_t buffer_size = BytesForBits(length);
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, buffer_size, &data));
+
+ // Transfer boolean values to packed bitmap
+ auto values = reinterpret_cast<const bool*>(reader->values());
+ uint8_t* data_ptr = data->mutable_data();
+ memset(data_ptr, 0, buffer_size);
+
+ for (int64_t i = 0; i < length; i++) {
+ if (values[i]) {
+ ::arrow::BitUtil::SetBit(data_ptr, i);
+ }
}
- }
- if (descr_->max_definition_level() > 0) {
- // TODO: Shrink arrays in the case they are too large
- if (valid_bits_idx_ < batch_size * 0.8) {
- // Shrink arrays as they are larger than the output.
- // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
- // without the need for a copy. Given a decent underlying allocator this
- // should still free some underlying pages to the OS.
-
- auto data_buffer = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(bool)));
- memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
- data_buffer_ = data_buffer;
-
- auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
- RETURN_NOT_OK(
- valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
- memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
- valid_bits_buffer->size());
- valid_bits_buffer_ = valid_bits_buffer;
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ RETURN_NOT_OK(is_valid->Resize(BytesForBits(length), false));
+ *out = std::make_shared<BooleanArray>(type, length, data, is_valid,
+ reader->null_count());
+ } else {
+ *out = std::make_shared<BooleanArray>(type, length, data);
}
- *out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_,
- valid_bits_buffer_, null_count_);
- // Relase the ownership
- data_buffer_.reset();
- valid_bits_buffer_.reset();
- } else {
- *out = std::make_shared<BooleanArray>(field_->type(), valid_bits_idx_, data_buffer_);
- data_buffer_.reset();
+ return Status::OK();
}
+};
- // Check if we should transform this array into an list array.
- return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
-
-template <typename ArrowType>
-Status PrimitiveImpl::ReadByteArrayBatch(int batch_size, std::shared_ptr<Array>* out) {
- using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
+template <>
+struct TransferFunctor<::arrow::TimestampType, Int96Type> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ auto values = reinterpret_cast<const Int96*>(reader->values());
+
+ std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
+
+ auto data_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+ for (int64_t i = 0; i < length; i++) {
+ *data_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
+ }
- int total_levels_read = 0;
- if (descr_->max_definition_level() > 0) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- if (descr_->max_repetition_level() > 0) {
- RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
- int values_to_read = batch_size;
- BuilderType builder(pool_);
- while ((values_to_read > 0) && column_reader_) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ByteArray), false));
- auto reader = dynamic_cast<TypedColumnReader<ByteArrayType>*>(column_reader_.get());
- int64_t values_read;
- int64_t levels_read;
- auto values = reinterpret_cast<ByteArray*>(values_buffer_.mutable_data());
- PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
- values_to_read, def_levels + total_levels_read,
- rep_levels + total_levels_read, values, &values_read));
- values_to_read -= static_cast<int>(levels_read);
- if (descr_->max_definition_level() == 0) {
- for (int64_t i = 0; i < levels_read; i++) {
- RETURN_NOT_OK(
- builder.Append(reinterpret_cast<const char*>(values[i].ptr), values[i].len));
- }
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ *out = std::make_shared<TimestampArray>(type, length, data, is_valid,
+ reader->null_count());
} else {
- // descr_->max_definition_level() > 0
- int values_idx = 0;
- int nullable_elements = descr_->schema_node()->is_optional();
- for (int64_t i = 0; i < levels_read; i++) {
- if (nullable_elements &&
- (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
- RETURN_NOT_OK(builder.AppendNull());
- } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
- RETURN_NOT_OK(
- builder.Append(reinterpret_cast<const char*>(values[values_idx].ptr),
- values[values_idx].len));
- values_idx++;
- }
- }
- total_levels_read += static_cast<int>(levels_read);
- }
- if (!column_reader_->HasNext()) {
- NextRowGroup();
+ *out = std::make_shared<TimestampArray>(type, length, data);
}
+
+ return Status::OK();
}
+};
- RETURN_NOT_OK(builder.Finish(out));
- // Check if we should transform this array into an list array.
- return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
+template <>
+struct TransferFunctor<::arrow::Date64Type, Int32Type> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ int64_t length = reader->values_written();
+ auto values = reinterpret_cast<const int32_t*>(reader->values());
+
+ std::shared_ptr<Buffer> data;
+ RETURN_NOT_OK(::arrow::AllocateBuffer(pool, length * sizeof(int64_t), &data));
+ auto out_ptr = reinterpret_cast<int64_t*>(data->mutable_data());
+
+ for (int64_t i = 0; i < length; i++) {
+ *out_ptr++ = static_cast<int64_t>(values[i]) * kMillisecondsInADay;
+ }
-template <typename ArrowType>
-Status PrimitiveImpl::ReadFLBABatch(int batch_size, int byte_width,
- std::shared_ptr<Array>* out) {
- using BuilderType = typename ::arrow::TypeTraits<ArrowType>::BuilderType;
- int total_levels_read = 0;
- if (descr_->max_definition_level() > 0) {
- RETURN_NOT_OK(def_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- if (descr_->max_repetition_level() > 0) {
- RETURN_NOT_OK(rep_levels_buffer_.Resize(batch_size * sizeof(int16_t), false));
- }
- int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
- int16_t* rep_levels = reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data());
-
- int values_to_read = batch_size;
- BuilderType builder(::arrow::fixed_size_binary(byte_width), pool_);
- while ((values_to_read > 0) && column_reader_) {
- RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(FLBA), false));
- auto reader = dynamic_cast<TypedColumnReader<FLBAType>*>(column_reader_.get());
- int64_t values_read;
- int64_t levels_read;
- auto values = reinterpret_cast<FLBA*>(values_buffer_.mutable_data());
- PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
- values_to_read, def_levels + total_levels_read,
- rep_levels + total_levels_read, values, &values_read));
- values_to_read -= static_cast<int>(levels_read);
- if (descr_->max_definition_level() == 0) {
- for (int64_t i = 0; i < levels_read; i++) {
- RETURN_NOT_OK(builder.Append(values[i].ptr));
- }
+ if (reader->nullable_values()) {
+ std::shared_ptr<PoolBuffer> is_valid = reader->ReleaseIsValid();
+ *out = std::make_shared<::arrow::Date64Array>(type, length, data, is_valid,
+ reader->null_count());
} else {
- int values_idx = 0;
- int nullable_elements = descr_->schema_node()->is_optional();
- for (int64_t i = 0; i < levels_read; i++) {
- if (nullable_elements &&
- (def_levels[i + total_levels_read] == (descr_->max_definition_level() - 1))) {
- RETURN_NOT_OK(builder.AppendNull());
- } else if (def_levels[i + total_levels_read] == descr_->max_definition_level()) {
- RETURN_NOT_OK(builder.Append(values[values_idx].ptr));
- values_idx++;
- }
- }
- total_levels_read += static_cast<int>(levels_read);
- }
- if (!column_reader_->HasNext()) {
- NextRowGroup();
+ *out = std::make_shared<::arrow::Date64Array>(type, length, data);
}
+ return Status::OK();
}
+};
- RETURN_NOT_OK(builder.Finish(out));
- // Check if we should transform this array into an list array.
- return WrapIntoListArray(def_levels, rep_levels, total_levels_read, out);
-}
-
-template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>(
- int batch_size, std::shared_ptr<Array>* out) {
- return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out);
-}
+template <typename ArrowType, typename ParquetType>
+struct TransferFunctor<
+ ArrowType, ParquetType,
+ typename std::enable_if<std::is_same<ParquetType, ByteArrayType>::value ||
+ std::is_same<ParquetType, FLBAType>::value>::type> {
+ Status operator()(RecordReader* reader, MemoryPool* pool,
+ const std::shared_ptr<::arrow::DataType>& type,
+ std::shared_ptr<Array>* out) {
+ RETURN_NOT_OK(reader->builder()->Finish(out));
+
+ if (type->id() == ::arrow::Type::STRING) {
+ // Convert from BINARY type to STRING
+ auto new_data = (*out)->data()->ShallowCopy();
+ new_data->type = type;
+ RETURN_NOT_OK(::arrow::MakeArray(new_data, out));
+ }
+ return Status::OK();
+ }
+};
-template <>
-Status PrimitiveImpl::TypedReadBatch<::arrow::StringType, ByteArrayType>(
- int batch_size, std::shared_ptr<Array>* out) {
- return ReadByteArrayBatch<::arrow::StringType>(batch_size, out);
-}
+#define TRANSFER_DATA(ArrowType, ParquetType) \
+ TransferFunctor<ArrowType, ParquetType> func; \
+ RETURN_NOT_OK(func(record_reader_.get(), pool_, field_->type(), out)); \
+ RETURN_NOT_OK(WrapIntoListArray<ParquetType>(out))
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
- case ::arrow::Type::ENUM: \
- return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
- break;
+#define TRANSFER_CASE(ENUM, ArrowType, ParquetType) \
+ case ::arrow::Type::ENUM: { \
+ TRANSFER_DATA(ArrowType, ParquetType); \
+ } break;
-Status PrimitiveImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
- if (!column_reader_) {
+Status PrimitiveImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+ if (!record_reader_->HasMoreData()) {
// Exhausted all row groups.
*out = nullptr;
return Status::OK();
}
- switch (field_->type()->id()) {
- case ::arrow::Type::NA:
- *out = std::make_shared<::arrow::NullArray>(batch_size);
- return Status::OK();
- break;
- TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
- TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
- TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
- TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
- TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
- TYPED_BATCH_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
- TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
- TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
- TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
- TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
- TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
- TYPED_BATCH_CASE(STRING, ::arrow::StringType, ByteArrayType)
- TYPED_BATCH_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
- TYPED_BATCH_CASE(DATE32, ::arrow::Date32Type, Int32Type)
- TYPED_BATCH_CASE(DATE64, ::arrow::Date64Type, Int32Type)
- case ::arrow::Type::FIXED_SIZE_BINARY: {
- int32_t byte_width =
- static_cast<::arrow::FixedSizeBinaryType*>(field_->type().get())->byte_width();
- return ReadFLBABatch<::arrow::FixedSizeBinaryType>(batch_size, byte_width, out);
- break;
+ if (field_->type()->id() == ::arrow::Type::NA) {
+ *out = std::make_shared<::arrow::NullArray>(records_to_read);
+ return Status::OK();
+ }
+
+ try {
+ // Pre-allocation gives much better performance for flat columns
+ record_reader_->Reserve(records_to_read);
+
+ record_reader_->Reset();
+ while (records_to_read > 0) {
+ if (!record_reader_->HasMoreData()) {
+ break;
+ }
+ int64_t records_read = record_reader_->ReadRecords(records_to_read);
+ records_to_read -= records_read;
+ if (records_read == 0) {
+ NextRowGroup();
+ }
}
+ } catch (const ::parquet::ParquetException& e) {
+ return ::arrow::Status::IOError(e.what());
+ }
+
+ switch (field_->type()->id()) {
+ TRANSFER_CASE(BOOL, ::arrow::BooleanType, BooleanType)
+ TRANSFER_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
+ TRANSFER_CASE(INT8, ::arrow::Int8Type, Int32Type)
+ TRANSFER_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
+ TRANSFER_CASE(INT16, ::arrow::Int16Type, Int32Type)
+ TRANSFER_CASE(UINT32, ::arrow::UInt32Type, Int32Type)
+ TRANSFER_CASE(INT32, ::arrow::Int32Type, Int32Type)
+ TRANSFER_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
+ TRANSFER_CASE(INT64, ::arrow::Int64Type, Int64Type)
+ TRANSFER_CASE(FLOAT, ::arrow::FloatType, FloatType)
+ TRANSFER_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
+ TRANSFER_CASE(STRING, ::arrow::StringType, ByteArrayType)
+ TRANSFER_CASE(BINARY, ::arrow::BinaryType, ByteArrayType)
+ TRANSFER_CASE(DATE32, ::arrow::Date32Type, Int32Type)
+ TRANSFER_CASE(DATE64, ::arrow::Date64Type, Int32Type)
+ TRANSFER_CASE(FIXED_SIZE_BINARY, ::arrow::FixedSizeBinaryType, FLBAType)
case ::arrow::Type::TIMESTAMP: {
::arrow::TimestampType* timestamp_type =
static_cast<::arrow::TimestampType*>(field_->type().get());
switch (timestamp_type->unit()) {
case ::arrow::TimeUnit::MILLI:
- return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
- break;
- case ::arrow::TimeUnit::MICRO:
- return TypedReadBatch<::arrow::TimestampType, Int64Type>(batch_size, out);
- break;
- case ::arrow::TimeUnit::NANO:
- return TypedReadBatch<::arrow::TimestampType, Int96Type>(batch_size, out);
- break;
+ case ::arrow::TimeUnit::MICRO: {
+ TRANSFER_DATA(::arrow::TimestampType, Int64Type);
+ } break;
+ case ::arrow::TimeUnit::NANO: {
+ TRANSFER_DATA(::arrow::TimestampType, Int96Type);
+ } break;
default:
return Status::NotImplemented("TimeUnit not supported");
}
break;
}
- TYPED_BATCH_CASE(TIME32, ::arrow::Time32Type, Int32Type)
- TYPED_BATCH_CASE(TIME64, ::arrow::Time64Type, Int64Type)
+ TRANSFER_CASE(TIME32, ::arrow::Time32Type, Int32Type)
+ TRANSFER_CASE(TIME64, ::arrow::Time64Type, Int64Type)
default:
std::stringstream ss;
ss << "No support for reading columns of type " << field_->type()->ToString();
return Status::NotImplemented(ss.str());
}
+
+ return Status::OK();
}
-void PrimitiveImpl::NextRowGroup() { column_reader_ = input_->Next(); }
+void PrimitiveImpl::NextRowGroup() {
+ std::unique_ptr<PageReader> page_reader = input_->NextChunk();
+ record_reader_->SetPageReader(std::move(page_reader));
+}
-Status PrimitiveImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
- *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
- *length = def_levels_buffer_.size() / sizeof(int16_t);
+Status PrimitiveImpl::GetDefLevels(const int16_t** data, size_t* length) {
+ *data = record_reader_->def_levels();
+ *length = record_reader_->levels_written();
return Status::OK();
}
-Status PrimitiveImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
- *data = reinterpret_cast<ValueLevelsPtr>(rep_levels_buffer_.data());
- *length = rep_levels_buffer_.size() / sizeof(int16_t);
+Status PrimitiveImpl::GetRepLevels(const int16_t** data, size_t* length) {
+ *data = record_reader_->rep_levels();
+ *length = record_reader_->levels_written();
return Status::OK();
}
-ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}
+ColumnReader::ColumnReader(std::unique_ptr<ColumnReaderImpl> impl)
+ : impl_(std::move(impl)) {}
ColumnReader::~ColumnReader() {}
-Status ColumnReader::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
- return impl_->NextBatch(batch_size, out);
+Status ColumnReader::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
+ return impl_->NextBatch(records_to_read, out);
}
// StructImpl methods
@@ -1380,7 +991,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out
int64_t* null_count_out) {
std::shared_ptr<Buffer> null_bitmap;
auto null_count = 0;
- ValueLevelsPtr def_levels_data;
+ const int16_t* def_levels_data;
size_t def_levels_length;
RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length));
RETURN_NOT_OK(GetEmptyBitmap(pool_, def_levels_length, &null_bitmap));
@@ -1402,7 +1013,7 @@ Status StructImpl::DefLevelsToNullArray(std::shared_ptr<Buffer>* null_bitmap_out
// TODO(itaiin): Consider caching the results of this calculation -
// note that this is only used once for each read for now
-Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
+Status StructImpl::GetDefLevels(const int16_t** data, size_t* length) {
*data = nullptr;
if (children_.size() == 0) {
// Empty struct
@@ -1411,7 +1022,7 @@ Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
}
// We have at least one child
- ValueLevelsPtr child_def_levels;
+ const int16_t* child_def_levels;
size_t child_length;
RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length));
auto size = child_length * sizeof(int16_t);
@@ -1438,27 +1049,27 @@ Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) {
std::max(result_levels[i], std::min(child_def_levels[i], struct_def_level_));
}
}
- *data = reinterpret_cast<ValueLevelsPtr>(def_levels_buffer_.data());
+ *data = reinterpret_cast<const int16_t*>(def_levels_buffer_.data());
*length = child_length;
return Status::OK();
}
-void StructImpl::InitField(const NodePtr& node,
- const std::vector<std::shared_ptr<Impl>>& children) {
+void StructImpl::InitField(
+ const Node* node, const std::vector<std::shared_ptr<ColumnReaderImpl>>& children) {
// Make a shallow node to field conversion from the children fields
std::vector<std::shared_ptr<::arrow::Field>> fields(children.size());
for (size_t i = 0; i < children.size(); i++) {
fields[i] = children[i]->field();
}
- auto type = std::make_shared<::arrow::StructType>(fields);
- field_ = std::make_shared<Field>(node->name(), type);
+ auto type = ::arrow::struct_(fields);
+ field_ = ::arrow::field(node->name(), type);
}
-Status StructImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) {
+Status StructImpl::GetRepLevels(const int16_t** data, size_t* length) {
return Status::NotImplemented("GetRepLevels is not implemented for struct");
}
-Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
+Status StructImpl::NextBatch(int64_t records_to_read, std::shared_ptr<Array>* out) {
std::vector<std::shared_ptr<Array>> children_arrays;
std::shared_ptr<Buffer> null_bitmap;
int64_t null_count;
@@ -1467,16 +1078,23 @@ Status StructImpl::NextBatch(int batch_size, std::shared_ptr<Array>* out) {
for (auto& child : children_) {
std::shared_ptr<Array> child_array;
- RETURN_NOT_OK(child->NextBatch(batch_size, &child_array));
-
+ RETURN_NOT_OK(child->NextBatch(records_to_read, &child_array));
children_arrays.push_back(child_array);
}
RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count));
- *out = std::make_shared<StructArray>(field()->type(), batch_size, children_arrays,
- null_bitmap, null_count);
+ int64_t struct_length = children_arrays[0]->length();
+ for (size_t i = 1; i < children_arrays.size(); ++i) {
+ if (children_arrays[i]->length() != struct_length) {
+ // TODO(wesm): This should really only occur if the Parquet file is
+ // malformed. Should this be a DCHECK?
+ return Status::Invalid("Struct children had different lengths");
+ }
+ }
+ *out = std::make_shared<StructArray>(field()->type(), struct_length, children_arrays,
+ null_bitmap, null_count);
return Status::OK();
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index ce82375..faaef9a 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -173,7 +173,7 @@ class PARQUET_EXPORT FileReader {
// might change in the future.
class PARQUET_EXPORT ColumnReader {
public:
- class PARQUET_NO_EXPORT Impl;
+ class PARQUET_NO_EXPORT ColumnReaderImpl;
virtual ~ColumnReader();
// Scan the next array of the indicated size. The actual size of the
@@ -185,11 +185,11 @@ class PARQUET_EXPORT ColumnReader {
//
// Returns Status::OK on a successful read, including if you have exhausted
// the data available in the file.
- ::arrow::Status NextBatch(int batch_size, std::shared_ptr<::arrow::Array>* out);
+ ::arrow::Status NextBatch(int64_t batch_size, std::shared_ptr<::arrow::Array>* out);
private:
- std::unique_ptr<Impl> impl_;
- explicit ColumnReader(std::unique_ptr<Impl> impl);
+ std::unique_ptr<ColumnReaderImpl> impl_;
+ explicit ColumnReader(std::unique_ptr<ColumnReaderImpl> impl);
friend class FileReader;
friend class PrimitiveImpl;
|