Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AA3B6200CAF for ; Thu, 22 Jun 2017 18:14:45 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id A9235160BE5; Thu, 22 Jun 2017 16:14:45 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id BAC3A160BE7 for ; Thu, 22 Jun 2017 18:14:43 +0200 (CEST) Received: (qmail 18735 invoked by uid 500); 22 Jun 2017 16:14:41 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 18716 invoked by uid 99); 22 Jun 2017 16:14:41 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 22 Jun 2017 16:14:41 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 721D8DFB8A; Thu, 22 Jun 2017 16:14:41 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uwe@apache.org To: commits@parquet.apache.org Message-Id: <38ce16dbd59047ee9a199e6080a6ad31@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: parquet-cpp git commit: PARQUET-911: [C++] Support nested structs in parquet_arrow Date: Thu, 22 Jun 2017 16:14:41 +0000 (UTC) archived-at: Thu, 22 Jun 2017 16:14:45 -0000 Repository: parquet-cpp Updated Branches: refs/heads/master 99759a38b -> 29ed01ea7 PARQUET-911: [C++] Support nested structs in parquet_arrow Support for simple StructArray reads. Not supported in conjunction with lists yet. Author: Itai Incze Closes #312 from itaiin/PARQUET-911 and squashes the following commits: beb16f5 [Itai Incze] document ReadSchemaField API and fix Appveyor errors 9fe3038 [Itai Incze] review changes + BooleanArray bugfix 420bb76 [Itai Incze] fix struct field type bug + minor changes c23f3fb [Itai Incze] Refactor per code reviews 11a12c3 [Itai Incze] fix msvc compiler errors 283f08d [Itai Incze] fix schema field ordering on partial read aeb1384 [Itai Incze] fix per code review 8bd47e8 [Itai Incze] Fix column index bug and extend tests 01d69ee [Itai Incze] Fix osx compiler c-array init error 14cbef8 [Itai Incze] support for simple StructArray reads. Not supported in conjunction with lists yet. Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/29ed01ea Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/29ed01ea Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/29ed01ea Branch: refs/heads/master Commit: 29ed01ea70fb5eba22b6df547c33fca047e08031 Parents: 99759a3 Author: Itai Incze Authored: Thu Jun 22 18:14:35 2017 +0200 Committer: Uwe L. Korn Committed: Thu Jun 22 18:14:35 2017 +0200 ---------------------------------------------------------------------- src/parquet/arrow/arrow-reader-writer-test.cc | 327 +++++++++++++++++-- src/parquet/arrow/reader.cc | 350 ++++++++++++++++++--- src/parquet/arrow/reader.h | 40 ++- src/parquet/arrow/schema.cc | 9 +- src/parquet/arrow/test-util.h | 2 +- src/parquet/column/column-reader-test.cc | 13 +- src/parquet/column/reader.h | 52 ++- src/parquet/schema-test.cc | 26 ++ src/parquet/schema.cc | 80 ++++- src/parquet/schema.h | 22 ++ src/parquet/util/schema-util.h | 84 +++++ 11 files changed, 904 insertions(+), 101 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index b9c77f1..16dddb0 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -50,6 +50,7 @@ using arrow::PrimitiveArray; using arrow::Status; using arrow::Table; using arrow::TimeUnit; +using arrow::ArrayVisitor; using ArrowId = ::arrow::Type; using ParquetType = parquet::Type; @@ -387,7 +388,7 @@ class TestParquetIO : public ::testing::Test { // Also test that slice offsets are respected values = values->Slice(5, values->length() - 5); std::shared_ptr lists; - ASSERT_OK(MakeListArary( + ASSERT_OK(MakeListArray( values, size, nullable_lists ? null_count : 0, nullable_elements, &lists)); *out = MakeSimpleTable(lists->Slice(3, size - 6), nullable_lists); } @@ -399,10 +400,10 @@ class TestParquetIO : public ::testing::Test { ASSERT_OK(NullableArray( size * 6, nullable_elements ? null_count : 0, kDefaultSeed, &values)); std::shared_ptr lists; - ASSERT_OK(MakeListArary( + ASSERT_OK(MakeListArray( values, size * 3, nullable_lists ? null_count : 0, nullable_elements, &lists)); std::shared_ptr parent_lists; - ASSERT_OK(MakeListArary(lists, size, nullable_parent_lists ? null_count : 0, + ASSERT_OK(MakeListArray(lists, size, nullable_parent_lists ? null_count : 0, nullable_lists, &parent_lists)); *out = MakeSimpleTable(parent_lists, nullable_parent_lists); } @@ -1080,22 +1081,17 @@ TEST(TestArrowWrite, CheckChunkSize) { Invalid, WriteTable(*table, ::arrow::default_memory_pool(), sink, chunk_size)); } -class TestNestedSchemaRead : public ::testing::Test { +class TestNestedSchemaRead : public ::testing::TestWithParam { protected: - virtual void SetUp() { - // We are using parquet low-level file api to create the nested parquet - CreateNestedParquet(); - InitReader(&reader_); - } + // make it *3 to make it easily divisible by 3 + const int NUM_SIMPLE_TEST_ROWS = SMALL_SIZE * 3; + std::shared_ptr<::arrow::Int32Array> values_array_ = nullptr; - void InitReader(std::shared_ptr* out) { + void InitReader() { std::shared_ptr buffer = nested_parquet_->GetBuffer(); - std::unique_ptr reader; ASSERT_OK_NO_THROW( OpenFile(std::make_shared(buffer), ::arrow::default_memory_pool(), - ::parquet::default_reader_properties(), nullptr, &reader)); - - *out = std::move(reader); + ::parquet::default_reader_properties(), nullptr, &reader_)); } void InitNewParquetFile(const std::shared_ptr& schema, int num_rows) { @@ -1110,61 +1106,330 @@ class TestNestedSchemaRead : public ::testing::Test { writer_->Close(); } - void CreateNestedParquet() { + void MakeValues(int num_rows) { + std::shared_ptr arr; + ASSERT_OK(NullableArray<::arrow::Int32Type>(num_rows, 0, kDefaultSeed, &arr)); + values_array_ = std::dynamic_pointer_cast<::arrow::Int32Array>(arr); + } + + void WriteColumnData(size_t num_rows, int16_t* def_levels, + int16_t* rep_levels, int32_t* values) { + auto typed_writer = static_cast*>( + row_group_writer_->NextColumn()); + typed_writer->WriteBatch(num_rows, def_levels, rep_levels, values); + } + + void ValidateArray(const Array& array, size_t expected_nulls) { + ASSERT_EQ(array.length(), values_array_->length()); + ASSERT_EQ(array.null_count(), expected_nulls); + // Also independently count the nulls + auto local_null_count = 0; + for (int i = 0; i < array.length(); i++) { + if (array.IsNull(i)) { + local_null_count++; + } + } + ASSERT_EQ(local_null_count, expected_nulls); + } + + void ValidateColumnArray(const ::arrow::Int32Array& array, + size_t expected_nulls) { + ValidateArray(array, expected_nulls); + + int j = 0; + for (int i = 0; i < values_array_->length(); i++) { + if (array.IsNull(i)) { + continue; + } + ASSERT_EQ(array.Value(i), values_array_->Value(j)); + j++; + } + } + + void ValidateTableArrayTypes(const Table& table) { + for (int i = 0; i < table.num_columns(); i++) { + const std::shared_ptr<::arrow::Field> schema_field = table.schema()->field(i); + const std::shared_ptr column = table.column(i); + // Compare with the column field + ASSERT_TRUE(schema_field->Equals(column->field())); + // Compare with the array type + ASSERT_TRUE(schema_field->type()->Equals(column->data()->chunk(0)->type())); + } + } + + // A parquet with a simple nested schema + void CreateSimpleNestedParquet(Repetition::type struct_repetition) { std::vector parquet_fields; - std::shared_ptr values; + // TODO(itaiin): We are using parquet low-level file api to create the nested parquet + // this needs to change when a nested writes are implemented // create the schema: - // required group group1 { + // group group1 { // required int32 leaf1; - // required int32 leaf2; + // optional int32 leaf2; // } // required int32 leaf3; - parquet_fields.push_back(GroupNode::Make("group1", Repetition::REQUIRED, + parquet_fields.push_back(GroupNode::Make("group1", struct_repetition, {PrimitiveNode::Make("leaf1", Repetition::REQUIRED, ParquetType::INT32), - PrimitiveNode::Make("leaf2", Repetition::REQUIRED, ParquetType::INT32)})); + PrimitiveNode::Make("leaf2", Repetition::OPTIONAL, ParquetType::INT32)})); parquet_fields.push_back( PrimitiveNode::Make("leaf3", Repetition::REQUIRED, ParquetType::INT32)); - const int num_columns = 3; auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields); - InitNewParquetFile(std::static_pointer_cast(schema_node), 0); + // Create definition levels for the different columns that contain interleaved + // nulls and values at all nesting levels + + // definition levels for optional fields + std::vector leaf1_def_levels(NUM_SIMPLE_TEST_ROWS); + std::vector leaf2_def_levels(NUM_SIMPLE_TEST_ROWS); + std::vector leaf3_def_levels(NUM_SIMPLE_TEST_ROWS); + for (int i = 0; i < NUM_SIMPLE_TEST_ROWS; i++) { + // leaf1 is required within the optional group1, so it is only null + // when the group is null + leaf1_def_levels[i] = (i % 3 == 0) ? 0 : 1; + // leaf2 is optional, can be null in the primitive (def-level 1) or + // struct level (def-level 0) + leaf2_def_levels[i] = i % 3; + // leaf3 is required + leaf3_def_levels[i] = 0; + } - for (int i = 0; i < num_columns; i++) { - auto column_writer = row_group_writer_->NextColumn(); - auto typed_writer = reinterpret_cast*>(column_writer); - typed_writer->WriteBatch(0, nullptr, nullptr, nullptr); + std::vector rep_levels(NUM_SIMPLE_TEST_ROWS, 0); + + // Produce values for the columns + MakeValues(NUM_SIMPLE_TEST_ROWS); + int32_t* values = reinterpret_cast(values_array_->data()->mutable_data()); + + // Create the actual parquet file + InitNewParquetFile(std::static_pointer_cast(schema_node), + NUM_SIMPLE_TEST_ROWS); + + // leaf1 column + WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf1_def_levels.data(), + rep_levels.data(), values); + // leaf2 column + WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf2_def_levels.data(), + rep_levels.data(), values); + // leaf3 column + WriteColumnData(NUM_SIMPLE_TEST_ROWS, leaf3_def_levels.data(), + rep_levels.data(), values); + + FinalizeParquetFile(); + InitReader(); + } + + NodePtr CreateSingleTypedNestedGroup(int index, int depth, int num_children, + Repetition::type node_repetition, ParquetType::type leaf_type) { + std::vector children; + + for (int i = 0; i < num_children; i++) { + if (depth <= 1) { + children.push_back(PrimitiveNode::Make("leaf", + node_repetition, leaf_type)); + } else { + children.push_back(CreateSingleTypedNestedGroup(i, depth - 1, num_children, + node_repetition, leaf_type)); + } } + std::stringstream ss; + ss << "group-" << depth << "-" << index; + return NodePtr(GroupNode::Make(ss.str(), node_repetition, children)); + } + + // A deeply nested schema + void CreateMultiLevelNestedParquet(int num_trees, int tree_depth, + int num_children, int num_rows, Repetition::type node_repetition) { + // Create the schema + std::vector parquet_fields; + for (int i = 0; i < num_trees; i++) { + parquet_fields.push_back(CreateSingleTypedNestedGroup(i, tree_depth, num_children, + node_repetition, ParquetType::INT32)); + } + auto schema_node = GroupNode::Make("schema", Repetition::REQUIRED, parquet_fields); + + int num_columns = num_trees * static_cast((std::pow(num_children, tree_depth))); + + std::vector def_levels(num_rows); + std::vector rep_levels(num_rows); + for (int i = 0; i < num_rows; i++) { + if (node_repetition == Repetition::REQUIRED) { + def_levels[i] = 0; // all is required + } else { + def_levels[i] = i % tree_depth; // all is optional + } + rep_levels[i] = 0; // none is repeated + } + + // Produce values for the columns + MakeValues(num_rows); + int32_t* values = reinterpret_cast(values_array_->data()->mutable_data()); + + // Create the actual parquet file + InitNewParquetFile(std::static_pointer_cast(schema_node), num_rows); + + for (int i = 0; i < num_columns; i++) { + WriteColumnData(num_rows, def_levels.data(), rep_levels.data(), values); + } FinalizeParquetFile(); + InitReader(); } + class DeepParquetTestVisitor : public ArrayVisitor { + public: + DeepParquetTestVisitor(Repetition::type node_repetition, + std::shared_ptr<::arrow::Int32Array> expected) : + node_repetition_(node_repetition), expected_(expected) {} + + Status Validate(std::shared_ptr tree) { + return tree->Accept(this); + } + + virtual Status Visit(const ::arrow::Int32Array& array) { + if (node_repetition_ == Repetition::REQUIRED) { + if (!array.Equals(expected_)) { + return Status::Invalid("leaf array data mismatch"); + } + } else if (node_repetition_ == Repetition::OPTIONAL) { + if (array.length() != expected_->length()) { + return Status::Invalid("Bad leaf array length"); + } + // expect only 1 value every `depth` row + if (array.null_count() != SMALL_SIZE) { + return Status::Invalid("Unexpected null count"); + } + } else { + return Status::NotImplemented("Unsupported repetition"); + } + return Status::OK(); + } + + virtual Status Visit(const ::arrow::StructArray& array) { + for (auto& child : array.fields()) { + if (node_repetition_ == Repetition::REQUIRED) { + RETURN_NOT_OK(child->Accept(this)); + } else if (node_repetition_ == Repetition::OPTIONAL) { + // Null count Must be a multiple of SMALL_SIZE + if (array.null_count() % SMALL_SIZE != 0) { + return Status::Invalid("Unexpected struct null count"); + } + } else { + return Status::NotImplemented("Unsupported repetition"); + } + } + return Status::OK(); + } + + private: + Repetition::type node_repetition_; + std::shared_ptr<::arrow::Int32Array> expected_; + }; + std::shared_ptr nested_parquet_; - std::shared_ptr reader_; + std::unique_ptr reader_; std::unique_ptr writer_; RowGroupWriter* row_group_writer_; }; TEST_F(TestNestedSchemaRead, ReadIntoTableFull) { + CreateSimpleNestedParquet(Repetition::OPTIONAL); + std::shared_ptr table; - ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table)); + ASSERT_OK_NO_THROW(reader_->ReadTable(&table)); + ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); + ASSERT_EQ(table->num_columns(), 2); + ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2); + ValidateTableArrayTypes(*table); + + auto struct_field_array = std::static_pointer_cast<::arrow::StructArray>( + table->column(0)->data()->chunk(0)); + auto leaf1_array = std::static_pointer_cast<::arrow::Int32Array>( + struct_field_array->field(0)); + auto leaf2_array = std::static_pointer_cast<::arrow::Int32Array>( + struct_field_array->field(1)); + auto leaf3_array = std::static_pointer_cast<::arrow::Int32Array>( + table->column(1)->data()->chunk(0)); + + // validate struct and leaf arrays + + // validate struct array + ValidateArray(*struct_field_array, NUM_SIMPLE_TEST_ROWS / 3); + // validate leaf1 + ValidateColumnArray(*leaf1_array, NUM_SIMPLE_TEST_ROWS / 3); + // validate leaf2 + ValidateColumnArray(*leaf2_array, NUM_SIMPLE_TEST_ROWS * 2/ 3); + // validate leaf3 + ValidateColumnArray(*leaf3_array, 0); } TEST_F(TestNestedSchemaRead, ReadTablePartial) { + CreateSimpleNestedParquet(Repetition::OPTIONAL); std::shared_ptr
table; - ASSERT_RAISES(NotImplemented, reader_->ReadTable({0, 2}, &table)); - ASSERT_RAISES(NotImplemented, reader_->ReadTable({0, 1}, &table)); + // columns: {group1.leaf1, leaf3} + ASSERT_OK_NO_THROW(reader_->ReadTable({0, 2}, &table)); + ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); + ASSERT_EQ(table->num_columns(), 2); + ASSERT_EQ(table->schema()->field(0)->name(), "group1"); + ASSERT_EQ(table->schema()->field(1)->name(), "leaf3"); + ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 1); + ValidateTableArrayTypes(*table); + + // columns: {group1.leaf1, group1.leaf2} + ASSERT_OK_NO_THROW(reader_->ReadTable({0, 1}, &table)); + ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); + ASSERT_EQ(table->num_columns(), 1); + ASSERT_EQ(table->schema()->field(0)->name(), "group1"); + ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 2); + ValidateTableArrayTypes(*table); // columns: {leaf3} ASSERT_OK_NO_THROW(reader_->ReadTable({2}, &table)); - ASSERT_EQ(table->num_rows(), 0); + ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); ASSERT_EQ(table->num_columns(), 1); + ASSERT_EQ(table->schema()->field(0)->name(), "leaf3"); ASSERT_EQ(table->schema()->field(0)->type()->num_children(), 0); + ValidateTableArrayTypes(*table); + + // Test with different ordering + ASSERT_OK_NO_THROW(reader_->ReadTable({2, 0}, &table)); + ASSERT_EQ(table->num_rows(), NUM_SIMPLE_TEST_ROWS); + ASSERT_EQ(table->num_columns(), 2); + ASSERT_EQ(table->schema()->field(0)->name(), "leaf3"); + ASSERT_EQ(table->schema()->field(1)->name(), "group1"); + ASSERT_EQ(table->schema()->field(1)->type()->num_children(), 1); + ValidateTableArrayTypes(*table); +} + +TEST_F(TestNestedSchemaRead, StructAndListTogetherUnsupported) { + CreateSimpleNestedParquet(Repetition::REPEATED); + std::shared_ptr
table; + ASSERT_RAISES(NotImplemented, reader_->ReadTable(&table)); +} + +TEST_P(TestNestedSchemaRead, DeepNestedSchemaRead) { + const int num_trees = 10; + const int depth = 5; + const int num_children = 3; + int num_rows = SMALL_SIZE * depth; + CreateMultiLevelNestedParquet(num_trees, depth, num_children, num_rows, GetParam()); + std::shared_ptr
table; + ASSERT_OK_NO_THROW(reader_->ReadTable(&table)); + ASSERT_EQ(table->num_columns(), num_trees); + ASSERT_EQ(table->num_rows(), num_rows); + + DeepParquetTestVisitor visitor(GetParam(), values_array_); + for (int i = 0; i < table->num_columns(); i++) { + auto tree = table->column(i)->data()->chunk(0); + ASSERT_OK_NO_THROW(visitor.Validate(tree)); + } } +INSTANTIATE_TEST_CASE_P(Repetition_type, TestNestedSchemaRead, + ::testing::Values(Repetition::REQUIRED, Repetition::OPTIONAL)); + TEST(TestArrowReaderAdHoc, Int96BadMemoryAccess) { // PARQUET-995 const char* data_dir = std::getenv("PARQUET_TEST_DATA"); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index a531454..7c1b381 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -28,6 +28,7 @@ #include "parquet/arrow/schema.h" #include "parquet/util/bit-util.h" +#include "parquet/util/schema-util.h" #include "arrow/api.h" @@ -37,11 +38,14 @@ using arrow::Column; using arrow::Field; using arrow::Int32Array; using arrow::ListArray; +using arrow::StructArray; using arrow::MemoryPool; using arrow::PoolBuffer; using arrow::Status; using arrow::Table; +using parquet::schema::NodePtr; + // Help reduce verbosity using ParquetReader = parquet::ParquetFileReader; @@ -179,7 +183,13 @@ class FileReader::Impl { virtual ~Impl() {} Status GetColumn(int i, std::unique_ptr* out); + Status ReadSchemaField(int i, std::shared_ptr* out); + Status ReadSchemaField(int i, const std::vector& indices, + std::shared_ptr* out); + Status GetReaderForNode(int index, const NodePtr& node, const std::vector& indices, + int16_t def_level, std::unique_ptr* out); Status ReadColumn(int i, std::shared_ptr* out); + Status GetSchema(std::shared_ptr<::arrow::Schema>* out); Status GetSchema( const std::vector& indices, std::shared_ptr<::arrow::Schema>* out); Status ReadRowGroup(int row_group_index, const std::vector& indices, @@ -204,9 +214,21 @@ class FileReader::Impl { int num_threads_; }; +typedef const int16_t* ValueLevelsPtr; + class ColumnReader::Impl { public: - Impl(MemoryPool* pool, std::unique_ptr input) + virtual ~Impl() {} + virtual Status NextBatch(int batch_size, std::shared_ptr* out) = 0; + virtual Status GetDefLevels(ValueLevelsPtr* data, size_t* length) = 0; + virtual Status GetRepLevels(ValueLevelsPtr* data, size_t* length) = 0; + virtual const std::shared_ptr field() = 0; +}; + +// Reader implementation for primitive arrays +class PrimitiveImpl: public ColumnReader::Impl { + public: + PrimitiveImpl(MemoryPool* pool, std::unique_ptr input) : pool_(pool), input_(std::move(input)), descr_(input_->descr()), @@ -217,9 +239,9 @@ class ColumnReader::Impl { NextRowGroup(); } - virtual ~Impl() {} + virtual ~PrimitiveImpl() {} - Status NextBatch(int batch_size, std::shared_ptr* out); + Status NextBatch(int batch_size, std::shared_ptr* out) override; template Status TypedReadBatch(int batch_size, std::shared_ptr* out); @@ -243,6 +265,11 @@ class ColumnReader::Impl { Status WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels, int64_t total_values_read, std::shared_ptr* array); + Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override; + Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override; + + const std::shared_ptr field() override { return field_; } + private: void NextRowGroup(); @@ -272,6 +299,36 @@ class ColumnReader::Impl { int64_t null_count_; }; +// Reader implementation for struct array +class StructImpl: public ColumnReader::Impl { + public: + explicit StructImpl(const std::vector>& children, + int16_t struct_def_level, MemoryPool* pool, const NodePtr& node) + : children_(children), struct_def_level_(struct_def_level), pool_(pool), + def_levels_buffer_(pool) { + InitField(node, children); + } + + virtual ~StructImpl() {} + + Status NextBatch(int batch_size, std::shared_ptr* out) override; + Status GetDefLevels(ValueLevelsPtr* data, size_t* length) override; + Status GetRepLevels(ValueLevelsPtr* data, size_t* length) override; + const std::shared_ptr field() override { return field_; } + + private: + std::vector> children_; + int16_t struct_def_level_; + MemoryPool* pool_; + std::shared_ptr field_; + PoolBuffer def_levels_buffer_; + + Status DefLevelsToNullArray(std::shared_ptr* null_bitmap, + int64_t* null_count); + void InitField(const NodePtr& node, + const std::vector>& children); +}; + FileReader::FileReader(MemoryPool* pool, std::unique_ptr reader) : impl_(new FileReader::Impl(pool, std::move(reader))) {} @@ -281,11 +338,96 @@ Status FileReader::Impl::GetColumn(int i, std::unique_ptr* out) { std::unique_ptr input(new AllRowGroupsIterator(i, reader_.get())); std::unique_ptr impl( - new ColumnReader::Impl(pool_, std::move(input))); + new PrimitiveImpl(pool_, std::move(input))); *out = std::unique_ptr(new ColumnReader(std::move(impl))); return Status::OK(); } +Status FileReader::Impl::GetReaderForNode(int index, const NodePtr& node, + const std::vector& indices, int16_t def_level, + std::unique_ptr* out) { + + *out = nullptr; + + if (IsSimpleStruct(node)) { + const schema::GroupNode* group = static_cast(node.get()); + std::vector> children; + for (int i = 0; i < group->field_count(); i++) { + std::unique_ptr child_reader; + // TODO(itaiin): Remove the -1 index hack when all types of nested reads + // are supported. This currently just signals the lower level reader resolution + // to abort + RETURN_NOT_OK(GetReaderForNode(index, group->field(i), indices, + def_level + 1, &child_reader)); + if (child_reader != nullptr) { + children.push_back(std::move(child_reader)); + } + } + + if (children.size() > 0) { + *out = std::unique_ptr( + new StructImpl(children, def_level, pool_, node)); + } + } else { + // This should be a flat field case - translate the field index to + // the correct column index by walking down to the leaf node + NodePtr walker = node; + while (!walker->is_primitive()) { + DCHECK(walker->is_group()); + auto group = static_cast(walker.get()); + if (group->field_count() != 1) { + return Status::NotImplemented( + "lists with structs are not supported."); + } + walker = group->field(0); + } + auto column_index = reader_->metadata()->schema()->ColumnIndex(*walker.get()); + + // If the index of the column is found then a reader for the coliumn is needed. + // Otherwise *out keeps the nullptr value. + if (std::find(indices.begin(), indices.end(), column_index) != indices.end()) { + std::unique_ptr reader; + RETURN_NOT_OK(GetColumn(column_index, &reader)); + *out = std::move(reader->impl_); + } + } + + return Status::OK(); +} + +Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr* out) { + std::vector indices(reader_->metadata()->num_columns()); + + for (size_t j = 0; j < indices.size(); ++j) { + indices[j] = static_cast(j); + } + + return ReadSchemaField(i, indices, out); +} + +Status FileReader::Impl::ReadSchemaField(int i, const std::vector& indices, + std::shared_ptr* out) { + auto parquet_schema = reader_->metadata()->schema(); + + auto node = parquet_schema->group_node()->field(i); + std::unique_ptr reader_impl; + + RETURN_NOT_OK(GetReaderForNode(i, node, indices, 1, &reader_impl)); + if (reader_impl == nullptr) { + *out = nullptr; + return Status::OK(); + } + + std::unique_ptr reader(new ColumnReader(std::move(reader_impl))); + + int64_t batch_size = 0; + for (int j = 0; j < reader_->metadata()->num_row_groups(); j++) { + batch_size += reader_->metadata()->RowGroup(j)->ColumnChunk(i)->num_values(); + } + + return reader->NextBatch(static_cast(batch_size), out); +} + Status FileReader::Impl::ReadColumn(int i, std::shared_ptr* out) { std::unique_ptr flat_column_reader; RETURN_NOT_OK(GetColumn(i, &flat_column_reader)); @@ -327,7 +469,7 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index, new SingleRowGroupIterator(column_index, row_group_index, reader_.get())); std::unique_ptr impl( - new ColumnReader::Impl(pool_, std::move(input))); + new PrimitiveImpl(pool_, std::move(input))); ColumnReader flat_column_reader(std::move(impl)); std::shared_ptr array; @@ -357,9 +499,17 @@ Status FileReader::Impl::ReadTable( int nthreads = std::min(num_threads_, num_fields); std::vector> columns(num_fields); - auto ReadColumnFunc = [&indices, &schema, &columns, this](int i) { + // We only need to read schema fields which have columns indicated + // in the indices vector + std::vector field_indices; + if (!ColumnIndicesToFieldIndices(*reader_->metadata()->schema(), + indices, &field_indices)) { + return Status::Invalid("Invalid column index"); + } + + auto ReadColumnFunc = [&indices, &field_indices, &schema, &columns, this](int i) { std::shared_ptr array; - RETURN_NOT_OK(ReadColumn(indices[i], &array)); + RETURN_NOT_OK(ReadSchemaField(field_indices[i], indices, &array)); columns[i] = std::make_shared(schema->field(i), array); return Status::OK(); }; @@ -424,6 +574,14 @@ Status FileReader::ReadColumn(int i, std::shared_ptr* out) { } } +Status FileReader::ReadSchemaField(int i, std::shared_ptr* out) { + try { + return impl_->ReadSchemaField(i, out); + } catch (const ::parquet::ParquetException& e) { + return ::arrow::Status::IOError(e.what()); + } +} + Status FileReader::ReadTable(std::shared_ptr
* out) { try { return impl_->ReadTable(out); @@ -471,7 +629,8 @@ const ParquetFileReader* FileReader::parquet_reader() const { } template -Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader* reader, +Status PrimitiveImpl::ReadNonNullableBatch( + TypedColumnReader* reader, int64_t values_to_read, int64_t* levels_read) { using ArrowCType = typename ArrowType::c_type; using ParquetCType = typename ParquetType::c_type; @@ -491,7 +650,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader* #define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \ template <> \ - Status ColumnReader::Impl::ReadNonNullableBatch( \ + Status PrimitiveImpl::ReadNonNullableBatch( \ TypedColumnReader * reader, int64_t values_to_read, \ int64_t * levels_read) { \ int64_t values_read; \ @@ -515,7 +674,7 @@ NONNULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t) NONNULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t) template <> -Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>( +Status PrimitiveImpl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>( TypedColumnReader* reader, int64_t values_to_read, int64_t* levels_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false)); auto values = reinterpret_cast(values_buffer_.mutable_data()); @@ -533,7 +692,7 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Typ } template <> -Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>( +Status PrimitiveImpl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>( TypedColumnReader* reader, int64_t values_to_read, int64_t* levels_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false)); auto values = reinterpret_cast(values_buffer_.mutable_data()); @@ -551,8 +710,8 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::Date64Type, Int32Type>( } template <> -Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>( - TypedColumnReader* reader, int64_t values_to_read, +Status PrimitiveImpl::ReadNonNullableBatch<::arrow::BooleanType, + BooleanType>(TypedColumnReader* reader, int64_t values_to_read, int64_t* levels_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false)); auto values = reinterpret_cast(values_buffer_.mutable_data()); @@ -569,7 +728,8 @@ Status ColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanTyp } template -Status ColumnReader::Impl::ReadNullableBatch(TypedColumnReader* reader, +Status PrimitiveImpl::ReadNullableBatch( + TypedColumnReader* reader, int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read, int64_t* values_read) { using ArrowCType = typename ArrowType::c_type; @@ -599,7 +759,7 @@ Status ColumnReader::Impl::ReadNullableBatch(TypedColumnReader* rea #define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType) \ template <> \ - Status ColumnReader::Impl::ReadNullableBatch( \ + Status PrimitiveImpl::ReadNullableBatch( \ TypedColumnReader * reader, int16_t * def_levels, \ int16_t * rep_levels, int64_t values_to_read, int64_t * levels_read, \ int64_t * values_read) { \ @@ -625,7 +785,7 @@ NULLABLE_BATCH_FAST_PATH(::arrow::Time32Type, Int32Type, int32_t) NULLABLE_BATCH_FAST_PATH(::arrow::Time64Type, Int64Type, int64_t) template <> -Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>( +Status PrimitiveImpl::ReadNullableBatch<::arrow::TimestampType, Int96Type>( TypedColumnReader* reader, int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read, int64_t* values_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96), false)); @@ -650,7 +810,7 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::TimestampType, Int96Type>( } template <> -Status ColumnReader::Impl::ReadNullableBatch<::arrow::Date64Type, Int32Type>( +Status PrimitiveImpl::ReadNullableBatch<::arrow::Date64Type, Int32Type>( TypedColumnReader* reader, int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read, int64_t* values_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(int32_t), false)); @@ -675,7 +835,7 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::Date64Type, Int32Type>( } template <> -Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>( +Status PrimitiveImpl::ReadNullableBatch<::arrow::BooleanType, BooleanType>( TypedColumnReader* reader, int16_t* def_levels, int16_t* rep_levels, int64_t values_to_read, int64_t* levels_read, int64_t* values_read) { RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false)); @@ -699,7 +859,7 @@ Status ColumnReader::Impl::ReadNullableBatch<::arrow::BooleanType, BooleanType>( } template -Status ColumnReader::Impl::InitDataBuffer(int batch_size) { +Status PrimitiveImpl::InitDataBuffer(int batch_size) { using ArrowCType = typename ArrowType::c_type; data_buffer_ = std::make_shared(pool_); RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType), false)); @@ -709,7 +869,7 @@ Status ColumnReader::Impl::InitDataBuffer(int batch_size) { } template <> -Status ColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size) { +Status PrimitiveImpl::InitDataBuffer<::arrow::BooleanType>(int batch_size) { data_buffer_ = std::make_shared(pool_); RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8, false)); data_buffer_ptr_ = data_buffer_->mutable_data(); @@ -718,7 +878,7 @@ Status ColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size) return Status::OK(); } -Status ColumnReader::Impl::InitValidBits(int batch_size) { +Status PrimitiveImpl::InitValidBits(int batch_size) { valid_bits_idx_ = 0; if (descr_->max_definition_level() > 0) { int valid_bits_size = @@ -732,17 +892,13 @@ Status ColumnReader::Impl::InitValidBits(int batch_size) { return Status::OK(); } -Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels, +Status PrimitiveImpl::WrapIntoListArray(const int16_t* def_levels, const int16_t* rep_levels, int64_t total_levels_read, std::shared_ptr* array) { std::shared_ptr<::arrow::Schema> arrow_schema; RETURN_NOT_OK(FromParquetSchema(input_->schema(), {input_->column_index()}, input_->metadata()->key_value_metadata(), &arrow_schema)); std::shared_ptr current_field = arrow_schema->field(0); - if (current_field->type()->id() == ::arrow::Type::STRUCT) { - return Status::NotImplemented("Structs are not yet supported."); - } - if (descr_->max_repetition_level() > 0) { // Walk downwards to extract nullability std::vector nullable; @@ -843,7 +999,8 @@ Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels, } template -Status ColumnReader::Impl::TypedReadBatch(int batch_size, std::shared_ptr* out) { +Status PrimitiveImpl::TypedReadBatch( + int batch_size, std::shared_ptr* out) { using ArrowCType = typename ArrowType::c_type; int values_to_read = batch_size; @@ -901,7 +1058,7 @@ Status ColumnReader::Impl::TypedReadBatch(int batch_size, std::shared_ptr } template <> -Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>( +Status PrimitiveImpl::TypedReadBatch<::arrow::BooleanType, BooleanType>( int batch_size, std::shared_ptr* out) { int values_to_read = batch_size; int total_levels_read = 0; @@ -917,9 +1074,6 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>( int16_t* rep_levels = reinterpret_cast(rep_levels_buffer_.mutable_data()); while ((values_to_read > 0) && column_reader_) { - if (descr_->max_definition_level() > 0) { - RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false)); - } auto reader = dynamic_cast*>(column_reader_.get()); int64_t values_read; int64_t levels_read; @@ -974,7 +1128,7 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>( } template -Status ColumnReader::Impl::ReadByteArrayBatch( +Status PrimitiveImpl::ReadByteArrayBatch( int batch_size, std::shared_ptr* out) { using BuilderType = typename ::arrow::TypeTraits::BuilderType; @@ -1031,7 +1185,7 @@ Status ColumnReader::Impl::ReadByteArrayBatch( } template -Status ColumnReader::Impl::ReadFLBABatch( +Status PrimitiveImpl::ReadFLBABatch( int batch_size, int byte_width, std::shared_ptr* out) { using BuilderType = typename ::arrow::TypeTraits::BuilderType; int total_levels_read = 0; @@ -1083,13 +1237,13 @@ Status ColumnReader::Impl::ReadFLBABatch( } template <> -Status ColumnReader::Impl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>( +Status PrimitiveImpl::TypedReadBatch<::arrow::BinaryType, ByteArrayType>( int batch_size, std::shared_ptr* out) { return ReadByteArrayBatch<::arrow::BinaryType>(batch_size, out); } template <> -Status ColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>( +Status PrimitiveImpl::TypedReadBatch<::arrow::StringType, ByteArrayType>( int batch_size, std::shared_ptr* out) { return ReadByteArrayBatch<::arrow::StringType>(batch_size, out); } @@ -1099,7 +1253,8 @@ Status ColumnReader::Impl::TypedReadBatch<::arrow::StringType, ByteArrayType>( return TypedReadBatch(batch_size, out); \ break; -Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out) { +Status PrimitiveImpl::NextBatch( + int batch_size, std::shared_ptr* out) { if (!column_reader_) { // Exhausted all row groups. *out = nullptr; @@ -1155,10 +1310,22 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr* out } } -void ColumnReader::Impl::NextRowGroup() { +void PrimitiveImpl::NextRowGroup() { column_reader_ = input_->Next(); } +Status PrimitiveImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) { + *data = reinterpret_cast(def_levels_buffer_.data()); + *length = def_levels_buffer_.size() / sizeof(int16_t); + return Status::OK(); +} + +Status PrimitiveImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) { + *data = reinterpret_cast(rep_levels_buffer_.data()); + *length = rep_levels_buffer_.size() / sizeof(int16_t); + return Status::OK(); +} + ColumnReader::ColumnReader(std::unique_ptr impl) : impl_(std::move(impl)) {} ColumnReader::~ColumnReader() {} @@ -1167,5 +1334,114 @@ Status ColumnReader::NextBatch(int batch_size, std::shared_ptr* out) { return impl_->NextBatch(batch_size, out); } +// StructImpl methods + +Status StructImpl::DefLevelsToNullArray( + std::shared_ptr* null_bitmap_out, + int64_t* null_count_out) { + std::shared_ptr null_bitmap; + auto null_count = 0; + ValueLevelsPtr def_levels_data; + size_t def_levels_length; + RETURN_NOT_OK(GetDefLevels(&def_levels_data, &def_levels_length)); + RETURN_NOT_OK(GetEmptyBitmap(pool_, + def_levels_length, &null_bitmap)); + uint8_t* null_bitmap_ptr = null_bitmap->mutable_data(); + for (size_t i = 0; i < def_levels_length; i++) { + if (def_levels_data[i] < struct_def_level_) { + // Mark null + null_count += 1; + } else { + DCHECK_EQ(def_levels_data[i], struct_def_level_); + ::arrow::BitUtil::SetBit(null_bitmap_ptr, i); + } + } + + *null_count_out = null_count; + *null_bitmap_out = (null_count == 0) ? nullptr : null_bitmap; + return Status::OK(); +} + +// TODO(itaiin): Consider caching the results of this calculation - +// note that this is only used once for each read for now +Status StructImpl::GetDefLevels(ValueLevelsPtr* data, size_t* length) { + *data = nullptr; + if (children_.size() == 0) { + // Empty struct + *length = 0; + return Status::OK(); + } + + // We have at least one child + ValueLevelsPtr child_def_levels; + size_t child_length; + RETURN_NOT_OK(children_[0]->GetDefLevels(&child_def_levels, &child_length)); + auto size = child_length * sizeof(int16_t); + def_levels_buffer_.Resize(size); + // Initialize with the minimal def level + std::memset(def_levels_buffer_.mutable_data(), -1, size); + auto result_levels = reinterpret_cast(def_levels_buffer_.mutable_data()); + + // When a struct is defined, all of its children def levels are at least at + // nesting level, and def level equals nesting level. + // When a struct is not defined, all of its children def levels are less than + // the nesting level, and the def level equals max(children def levels) + // All other possibilities are malformed definition data. + for (auto& child : children_) { + size_t current_child_length; + RETURN_NOT_OK(child->GetDefLevels(&child_def_levels, ¤t_child_length)); + DCHECK_EQ(child_length, current_child_length); + for (size_t i = 0; i < child_length; i++) { + // Check that value is either uninitialized, or current + // and previous children def levels agree on the struct level + DCHECK((result_levels[i] == -1) || + ((result_levels[i] >= struct_def_level_) == + (child_def_levels[i] >= struct_def_level_))); + result_levels[i] = std::max(result_levels[i], + std::min(child_def_levels[i], struct_def_level_)); + } + } + *data = reinterpret_cast(def_levels_buffer_.data()); + *length = child_length; + return Status::OK(); +} + +void StructImpl::InitField(const NodePtr& node, + const std::vector>& children) { + // Make a shallow node to field conversion from the children fields + std::vector> fields(children.size()); + for (size_t i = 0; i < children.size(); i++) { + fields[i] = children[i]->field(); + } + auto type = std::make_shared<::arrow::StructType>(fields); + field_ = std::make_shared(node->name(), type); +} + +Status StructImpl::GetRepLevels(ValueLevelsPtr* data, size_t* length) { + return Status::NotImplemented("GetRepLevels is not implemented for struct"); +} + +Status StructImpl::NextBatch(int batch_size, std::shared_ptr* out) { + std::vector> children_arrays; + std::shared_ptr null_bitmap; + int64_t null_count; + + // Gather children arrays and def levels + for (auto& child : children_) { + std::shared_ptr child_array; + + RETURN_NOT_OK(child->NextBatch(batch_size, &child_array)); + + children_arrays.push_back(child_array); + } + + RETURN_NOT_OK(DefLevelsToNullArray(&null_bitmap, &null_count)); + + *out = std::make_shared(field()->type(), batch_size, children_arrays, + null_bitmap, null_count); + + return Status::OK(); +} + } // namespace arrow } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h index 24601b8..3a97c83 100644 --- a/src/parquet/arrow/reader.h +++ b/src/parquet/arrow/reader.h @@ -99,10 +99,44 @@ class PARQUET_EXPORT FileReader { // Read column as a whole into an Array. ::arrow::Status ReadColumn(int i, std::shared_ptr<::arrow::Array>* out); - // Read a table of flat columns into a Table. + // NOTE: Experimental API + // Reads a specific top level schema field into an Array + // The index i refers the index of the top level schema field, which may + // be nested or flat - e.g. + // + // 0 foo.bar + // foo.bar.baz + // foo.qux + // 1 foo2 + // 2 foo3 + // + // i=0 will read the entire foo struct, i=1 the foo2 primitive column etc + ::arrow::Status ReadSchemaField(int i, std::shared_ptr<::arrow::Array>* out); + + // NOTE: Experimental API + // Reads a specific top level schema field into an Array, while keeping only chosen + // leaf columns. + // The index i refers the index of the top level schema field, which may + // be nested or flat, and indices vector refers to the leaf column indices - e.g. + // + // i indices + // 0 0 foo.bar + // 0 1 foo.bar.baz + // 0 2 foo.qux + // 1 3 foo2 + // 2 4 foo3 + // + // i=0 indices={0,2} will read a partial struct with foo.bar and foo.quox columns + // i=1 indices={3} will read foo2 column + // i=1 indices={2} will result in out=nullptr + // leaf indices which are unrelated to the schema field are ignored + ::arrow::Status ReadSchemaField(int i, const std::vector& indices, + std::shared_ptr<::arrow::Array>* out); + + // Read a table of columns into a Table ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out); - // Read a table of flat columns into a Table. Read only the indicated column + // Read a table of columns into a Table. Read only the indicated column // indices (relative to the schema) ::arrow::Status ReadTable( const std::vector& column_indices, std::shared_ptr<::arrow::Table>* out); @@ -154,6 +188,8 @@ class PARQUET_EXPORT ColumnReader { explicit ColumnReader(std::unique_ptr impl); friend class FileReader; + friend class PrimitiveImpl; + friend class StructImpl; }; // Helper function to create a file reader from an implementation of an Arrow http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index 83968bc..a78a23b 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -22,6 +22,7 @@ #include #include "parquet/api/schema.h" +#include "parquet/util/schema-util.h" #include "arrow/api.h" @@ -224,11 +225,6 @@ Status StructFromGroup(const GroupNode* group, return Status::OK(); } -bool str_endswith_tuple(const std::string& str) { - if (str.size() >= 6) { return str.substr(str.size() - 6, 6) == "_tuple"; } - return false; -} - Status NodeToList(const GroupNode* group, const std::unordered_set* included_leaf_nodes, TypePtr* out) { *out = nullptr; @@ -240,8 +236,7 @@ Status NodeToList(const GroupNode* group, // Special case mentioned in the format spec: // If the name is array or ends in _tuple, this should be a list of struct // even for single child elements. - if (list_group->field_count() == 1 && list_node->name() != "array" && - !str_endswith_tuple(list_node->name())) { + if (list_group->field_count() == 1 && !HasStructListName(*list_group)) { // List of primitive type std::shared_ptr item_field; RETURN_NOT_OK( http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/arrow/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h index a5337cf..e44fcb6 100644 --- a/src/parquet/arrow/test-util.h +++ b/src/parquet/arrow/test-util.h @@ -271,7 +271,7 @@ typename std::enable_if::value, Status>::type NullableA /// Wrap an Array into a ListArray by splitting it up into size lists. /// /// This helper function only supports (size/2) nulls. -Status MakeListArary(const std::shared_ptr& values, int64_t size, +Status MakeListArray(const std::shared_ptr& values, int64_t size, int64_t null_count, bool nullable_values, std::shared_ptr<::arrow::ListArray>* out) { // We always include an empty list int64_t non_null_entries = size - null_count - 1; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index a31c817..e34ac4c 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -44,7 +44,8 @@ namespace test { template static inline bool vector_equal_with_def_levels(const vector& left, - const vector def_levels, int16_t max_def_levels, const vector& right) { + const vector& def_levels, int16_t max_def_levels, int16_t max_rep_levels, + const vector& right) { size_t i_left = 0; size_t i_right = 0; for (size_t i = 0; i < def_levels.size(); i++) { @@ -57,9 +58,14 @@ static inline bool vector_equal_with_def_levels(const vector& left, } i_left++; i_right++; - } else if (def_levels[i] == (max_def_levels - 1)) { + } else if (def_levels[i] == (max_def_levels -1)) { // Null entry on the lowest nested level i_right++; + } else if (def_levels[i] < (max_def_levels - 1)) { + // Null entry on a higher nesting level, only supported for non-repeating data + if (max_rep_levels == 0) { + i_right++; + } } } @@ -142,7 +148,8 @@ class TestPrimitiveReader : public ::testing::Test { if (max_def_level_ > 0) { ASSERT_TRUE(vector_equal(def_levels_, dresult)); ASSERT_TRUE( - vector_equal_with_def_levels(values_, dresult, max_def_level_, vresult)); + vector_equal_with_def_levels(values_, dresult, max_def_level_, + max_rep_level_, vresult)); } else { ASSERT_TRUE(vector_equal(values_, vresult)); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/column/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 9749e56..724773d 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -24,6 +24,7 @@ #include #include #include +#include #include @@ -263,20 +264,35 @@ inline int64_t TypedColumnReader::ReadBatch(int64_t batch_size, } inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels, - int16_t max_definition_level, int64_t* values_read, int64_t* null_count, + int16_t max_definition_level, int16_t max_repetition_level, + int64_t* values_read, int64_t* null_count, uint8_t* valid_bits, int64_t valid_bits_offset) { int byte_offset = static_cast(valid_bits_offset) / 8; int bit_offset = static_cast(valid_bits_offset) % 8; uint8_t bitset = valid_bits[byte_offset]; + // TODO(itaiin): As an interim solution we are splitting the code path here + // between repeated+flat column reads, and non-repeated+nested reads. + // Those paths need to be merged in the future for (int i = 0; i < num_def_levels; ++i) { if (def_levels[i] == max_definition_level) { bitset |= (1 << bit_offset); - } else if (def_levels[i] == (max_definition_level - 1)) { - bitset &= ~(1 << bit_offset); - *null_count += 1; + } else if (max_repetition_level > 0) { + // repetition+flat case + if (def_levels[i] == (max_definition_level - 1)) { + bitset &= ~(1 << bit_offset); + *null_count += 1; + } else { + continue; + } } else { - continue; + // non-repeated+nested case + if (def_levels[i] < max_definition_level) { + bitset &= ~(1 << bit_offset); + *null_count += 1; + } else { + throw ParquetException("definition level exceeds maximum"); + } } bit_offset++; @@ -322,9 +338,28 @@ inline int64_t TypedColumnReader::ReadBatchSpaced(int64_t batch_size, } } + // TODO(itaiin): another code path split to merge when the general case is done + bool has_spaced_values; + if (descr_->max_repetition_level() > 0) { + // repeated+flat case + has_spaced_values = !descr_->schema_node()->is_required(); + } else { + // non-repeated+nested case + // Find if a node forces nulls in the lowest level along the hierarchy + const schema::Node* node = descr_->schema_node().get(); + has_spaced_values = false; + while (node) { + auto parent = node->parent(); + if (node->is_optional()) { + has_spaced_values = true; + break; + } + node = parent; + } + } + int64_t null_count = 0; - if (descr_->schema_node()->is_required()) { - // Node is required so there are no null entries on the lowest nesting level. + if (!has_spaced_values) { int values_to_read = 0; for (int64_t i = 0; i < num_def_levels; ++i) { if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; } @@ -336,8 +371,9 @@ inline int64_t TypedColumnReader::ReadBatchSpaced(int64_t batch_size, *values_read = total_values; } else { int16_t max_definition_level = descr_->max_definition_level(); + int16_t max_repetition_level = descr_->max_repetition_level(); DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level, - values_read, &null_count, valid_bits, valid_bits_offset); + max_repetition_level, values_read, &null_count, valid_bits, valid_bits_offset); total_values = ReadValuesSpaced(*values_read, values, static_cast(null_count), valid_bits, valid_bits_offset); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/schema-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc index f8a7205..58f23df 100644 --- a/src/parquet/schema-test.cc +++ b/src/parquet/schema-test.cc @@ -333,6 +333,21 @@ TEST_F(TestGroupNode, Equals) { ASSERT_FALSE(group5.Equals(&group4)); } +TEST_F(TestGroupNode, FieldIndex) { + NodeVector fields = Fields1(); + GroupNode group("group", Repetition::REQUIRED, fields); + for (size_t i = 0; i < fields.size(); i++) { + auto field = group.field(static_cast(i)); + ASSERT_EQ(i, group.FieldIndex(*field.get())); + } + + // Test a non field node + auto non_field_alien = Int32("alien", Repetition::REQUIRED); // other name + auto non_field_familiar = Int32("one", Repetition::REPEATED); // other node + ASSERT_TRUE(group.FieldIndex(*non_field_alien.get()) < 0); + ASSERT_TRUE(group.FieldIndex(*non_field_familiar.get()) < 0); +} + // ---------------------------------------------------------------------- // Test convert group @@ -648,6 +663,17 @@ TEST_F(TestSchemaDescriptor, BuildTree) { ASSERT_EQ(descr_.Column(4)->path()->ToDotString(), "bag.records.item2"); ASSERT_EQ(descr_.Column(5)->path()->ToDotString(), "bag.records.item3"); + for (int i = 0; i < nleaves; ++i) { + auto col = descr_.Column(i); + ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node().get())); + } + + // Test non-column nodes find + NodePtr non_column_alien = Int32("alien", Repetition::REQUIRED); // other path + NodePtr non_column_familiar = Int32("a", Repetition::REPEATED); // other node + ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0); + ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0); + ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0).get()); ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3).get()); ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc index a3debd0..1209ad1 100644 --- a/src/parquet/schema.cc +++ b/src/parquet/schema.cc @@ -44,6 +44,21 @@ std::shared_ptr ColumnPath::FromDotString(const std::string& dotstri return std::shared_ptr(new ColumnPath(std::move(path))); } +std::shared_ptr ColumnPath::FromNode(const Node& node) { + // Build the path in reverse order as we traverse the nodes to the top + std::vector rpath_; + const Node* cursor = &node; + // The schema node is not part of the ColumnPath + while (cursor->parent()) { + rpath_.push_back(cursor->name()); + cursor = cursor->parent(); + } + + // Build ColumnPath in correct order + std::vector path(rpath_.crbegin(), rpath_.crend()); + return std::make_shared(std::move(path)); +} + std::shared_ptr ColumnPath::extend(const std::string& node_name) const { std::vector path; path.reserve(path_.size() + 1); @@ -70,6 +85,12 @@ const std::vector& ColumnPath::ToDotVector() const { // ---------------------------------------------------------------------- // Base node +const std::shared_ptr Node::path() const { + // TODO(itaiin): Cache the result, or more precisely, cache ->ToDotString() + // since it is being used to access the leaf nodes + return ColumnPath::FromNode(*this); +} + bool Node::EqualsInternal(const Node* other) const { return type_ == other->type_ && name_ == other->name_ && repetition_ == other->repetition_ && logical_type_ == other->logical_type_; @@ -229,6 +250,28 @@ bool GroupNode::Equals(const Node* other) const { return EqualsInternal(static_cast(other)); } +int GroupNode::FieldIndex(const std::string& name) const { + auto search = field_name_to_idx_.find(name); + if (search == field_name_to_idx_.end()) { + // Not found + return -1; + } + return search->second; +} + +int GroupNode::FieldIndex(const Node& node) const { + int result = FieldIndex(node.name()); + if (result < 0) { + return -1; + } + DCHECK(result < field_count()); + if (!node.Equals(field(result).get())) { + // Same name but not the same node + return -1; + } + return result; +} + void GroupNode::Visit(Node::Visitor* visitor) { visitor->Visit(this); } @@ -595,6 +638,8 @@ void SchemaDescriptor::BuildTree(const NodePtr& node, int16_t max_def_level, // Primitive node, append to leaves leaves_.push_back(ColumnDescriptor(node, max_def_level, max_rep_level, this)); leaf_to_base_.emplace(static_cast(leaves_.size()) - 1, base); + leaf_to_idx_.emplace( + node->path()->ToDotString(), static_cast(leaves_.size()) - 1); } } @@ -620,6 +665,28 @@ const ColumnDescriptor* SchemaDescriptor::Column(int i) const { return &leaves_[i]; } +int SchemaDescriptor::ColumnIndex(const std::string& node_path) const { + auto search = leaf_to_idx_.find(node_path); + if (search == leaf_to_idx_.end()) { + // Not found + return -1; + } + return search->second; +} + +int SchemaDescriptor::ColumnIndex(const Node& node) const { + int result = ColumnIndex(node.path()->ToDotString()); + if (result < 0) { + return -1; + } + DCHECK(result < num_columns()); + if (!node.Equals(Column(result)->schema_node().get())) { + // Same path but not the same node + return -1; + } + return result; +} + const schema::NodePtr& SchemaDescriptor::GetColumnRoot(int i) const { DCHECK(i >= 0 && i < static_cast(leaves_.size())); return leaf_to_base_.find(i)->second; @@ -638,18 +705,7 @@ int ColumnDescriptor::type_length() const { } const std::shared_ptr ColumnDescriptor::path() const { - // Build the path in reverse order as we traverse the nodes to the top - std::vector rpath_; - const Node* node = primitive_node_; - // The schema node is not part of the ColumnPath - while (node->parent()) { - rpath_.push_back(node->name()); - node = node->parent(); - } - - // Build ColumnPath in correct order - std::vector path_(rpath_.crbegin(), rpath_.crend()); - return std::make_shared(std::move(path_)); + return primitive_node_->path(); } } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/schema.h ---------------------------------------------------------------------- diff --git a/src/parquet/schema.h b/src/parquet/schema.h index 1615798..856f72d 100644 --- a/src/parquet/schema.h +++ b/src/parquet/schema.h @@ -38,6 +38,8 @@ class SchemaDescriptor; namespace schema { +class Node; + // List encodings: using the terminology from Impala to define different styles // of representing logical lists (a.k.a. ARRAY types) in Parquet schemas. Since // the converted type named in the Parquet metadata is ConvertedType::LIST we @@ -87,6 +89,7 @@ class PARQUET_EXPORT ColumnPath { explicit ColumnPath(std::vector&& path) : path_(path) {} static std::shared_ptr FromDotString(const std::string& dotstring); + static std::shared_ptr FromNode(const Node& node); std::shared_ptr extend(const std::string& node_name) const; std::string ToDotString() const; @@ -139,6 +142,8 @@ class PARQUET_EXPORT Node { const Node* parent() const { return parent_; } + const std::shared_ptr path() const; + // ToParquet returns an opaque void* to avoid exporting // parquet::SchemaElement into the public API virtual void ToParquet(void* opaque_element) const = 0; @@ -249,6 +254,8 @@ class PARQUET_EXPORT GroupNode : public Node { bool Equals(const Node* other) const override; const NodePtr& field(int i) const { return fields_[i]; } + int FieldIndex(const std::string& name) const; + int FieldIndex(const Node& node) const; int field_count() const { return static_cast(fields_.size()); } @@ -261,16 +268,23 @@ class PARQUET_EXPORT GroupNode : public Node { const NodeVector& fields, LogicalType::type logical_type = LogicalType::NONE, int id = -1) : Node(Node::GROUP, name, repetition, logical_type, id), fields_(fields) { + field_name_to_idx_.clear(); + auto field_idx = 0; for (NodePtr& field : fields_) { field->SetParent(this); + field_name_to_idx_[field->name()] = field_idx++; } } NodeVector fields_; bool EqualsInternal(const GroupNode* other) const; + // Mapping between field name to the field index + std::unordered_map field_name_to_idx_; + FRIEND_TEST(TestGroupNode, Attrs); FRIEND_TEST(TestGroupNode, Equals); + FRIEND_TEST(TestGroupNode, FieldIndex); }; // ---------------------------------------------------------------------- @@ -362,6 +376,11 @@ class PARQUET_EXPORT SchemaDescriptor { const ColumnDescriptor* Column(int i) const; + // Get the index of a column by its dotstring path, or negative value if not found + int ColumnIndex(const std::string& node_path) const; + // Get the index of a column by its node, or negative value if not found + int ColumnIndex(const schema::Node& node) const; + bool Equals(const SchemaDescriptor& other) const; // The number of physical columns appearing in the file @@ -398,6 +417,9 @@ class PARQUET_EXPORT SchemaDescriptor { // -- -- -- c | // -- -- -- -- d std::unordered_map leaf_to_base_; + + // Mapping between ColumnPath DotString to the leaf index + std::unordered_map leaf_to_idx_; }; } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/29ed01ea/src/parquet/util/schema-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h new file mode 100644 index 0000000..618d21e --- /dev/null +++ b/src/parquet/util/schema-util.h @@ -0,0 +1,84 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_SCHEMA_UTIL_H +#define PARQUET_SCHEMA_UTIL_H + +#include +#include +#include + +#include "parquet/exception.h" +#include "parquet/schema.h" +#include "parquet/types.h" +#include "parquet/util/logging.h" + +using parquet::ParquetException; +using parquet::SchemaDescriptor; +using parquet::schema::GroupNode; +using parquet::schema::NodePtr; +using parquet::schema::Node; +using parquet::LogicalType; + +inline bool str_endswith_tuple(const std::string& str) { + if (str.size() >= 6) { return str.substr(str.size() - 6, 6) == "_tuple"; } + return false; +} + +// Special case mentioned in the format spec: +// If the name is array or ends in _tuple, this should be a list of struct +// even for single child elements. +inline bool HasStructListName(const GroupNode& node) { + return (node.name() == "array" || + str_endswith_tuple(node.name())); +} + +// TODO(itaiin): This aux. function is to be deleted once repeated structs are supported +inline bool IsSimpleStruct(const NodePtr& node) { + if (!node->is_group()) return false; + if (node->is_repeated()) return false; + if (node->logical_type() == LogicalType::LIST) return false; + // Special case mentioned in the format spec: + // If the name is array or ends in _tuple, this should be a list of struct + // even for single child elements. + auto group = static_cast(node.get()); + if (group->field_count() == 1 && HasStructListName(*group)) return false; + + return true; +} + +// Coalesce a list of schema fields indices which are the roots of the +// columns referred by a list of column indices +inline bool ColumnIndicesToFieldIndices(const SchemaDescriptor& descr, + const std::vector& column_indices, std::vector* out) { + const GroupNode* group = descr.group_node(); + std::unordered_set already_added; + out->clear(); + for (auto& column_idx : column_indices) { + auto field_node = descr.GetColumnRoot(column_idx); + auto field_idx = group->FieldIndex(field_node->name()); + if (field_idx < 0) { + return false; + } + auto insertion = already_added.insert(field_idx); + if (insertion.second) { out->push_back(field_idx); } + } + + return true; +} + +#endif // PARQUET_SCHEMA_UTIL_H