From commits-return-1227-archive-asf-public=cust-asf.ponee.io@parquet.apache.org Tue Feb 13 16:08:52 2018 Return-Path: X-Original-To: archive-asf-public@cust-asf.ponee.io Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by mx-eu-01.ponee.io (Postfix) with SMTP id 5C010180656 for ; Tue, 13 Feb 2018 16:08:51 +0100 (CET) Received: (qmail 26713 invoked by uid 500); 13 Feb 2018 15:08:50 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 26704 invoked by uid 99); 13 Feb 2018 15:08:50 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 13 Feb 2018 15:08:50 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id D2DF68235E; Tue, 13 Feb 2018 15:08:49 +0000 (UTC) Date: Tue, 13 Feb 2018 15:08:49 +0000 To: "commits@parquet.apache.org" Subject: [parquet-cpp] branch master updated: PARQUET-1200: Support reading a single Arrow column from a Parquet file MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151853452974.18574.8226121450324326230@gitbox.apache.org> From: uwe@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: parquet-cpp X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: a9fdeb0b1f2105290a88246c7a7fd8717751200a X-Git-Newrev: d5fc8482221c9350dafcce5864e80ad42b853387 X-Git-Rev: d5fc8482221c9350dafcce5864e80ad42b853387 X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated This is an automated email from the ASF dual-hosted git repository. uwe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-cpp.git The following commit(s) were added to refs/heads/master by this push: new d5fc848 PARQUET-1200: Support reading a single Arrow column from a Parquet file d5fc848 is described below commit d5fc8482221c9350dafcce5864e80ad42b853387 Author: Korn, Uwe AuthorDate: Tue Feb 13 16:08:31 2018 +0100 PARQUET-1200: Support reading a single Arrow column from a Parquet file cc @lorenzhs Author: Korn, Uwe Closes #434 from xhochy/PARQUET-1200 and squashes the following commits: 98ac6f0 [Korn, Uwe] PARQUET-1200: Support reading a single Arrow column from a Parquet file --- src/parquet/arrow/arrow-reader-writer-test.cc | 2 +- src/parquet/arrow/arrow-schema-test.cc | 4 +- src/parquet/arrow/reader.cc | 64 +++++++++++++++++++++++---- src/parquet/arrow/reader.h | 56 +++++++++++++++++++++++ src/parquet/file_reader.cc | 18 ++++---- 5 files changed, 123 insertions(+), 21 deletions(-) diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index db12fb4..369eb2e 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -1496,7 +1496,7 @@ TEST(TestArrowReadWrite, ReadSingleRowGroup) { std::shared_ptr r1, r2; // Read everything ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1)); - ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2)); + ASSERT_OK_NO_THROW(reader->RowGroup(1)->ReadTable(&r2)); std::shared_ptr
concatenated; ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated)); diff --git a/src/parquet/arrow/arrow-schema-test.cc b/src/parquet/arrow/arrow-schema-test.cc index b33eda1..771b996 100644 --- a/src/parquet/arrow/arrow-schema-test.cc +++ b/src/parquet/arrow/arrow-schema-test.cc @@ -62,8 +62,8 @@ class TestConvertParquetSchema : public ::testing::Test { for (int i = 0; i < expected_schema->num_fields(); ++i) { auto lhs = result_schema_->field(i); auto rhs = expected_schema->field(i); - EXPECT_TRUE(lhs->Equals(rhs)) << i << " " << lhs->ToString() - << " != " << rhs->ToString(); + EXPECT_TRUE(lhs->Equals(rhs)) + << i << " " << lhs->ToString() << " != " << rhs->ToString(); } } diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index 9318305..7f81771 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -170,6 +170,8 @@ class FileReader::Impl { int16_t def_level, std::unique_ptr* out); Status ReadColumn(int i, std::shared_ptr* out); + Status ReadColumnChunk(int column_index, int row_group_index, + std::shared_ptr* out); Status GetSchema(std::shared_ptr<::arrow::Schema>* out); Status GetSchema(const std::vector& indices, std::shared_ptr<::arrow::Schema>* out); @@ -391,6 +393,24 @@ Status FileReader::Impl::GetSchema(const std::vector& indices, return FromParquetSchema(descr, indices, parquet_key_value_metadata, out); } +Status FileReader::Impl::ReadColumnChunk(int column_index, int row_group_index, + std::shared_ptr* out) { + auto rg_metadata = reader_->metadata()->RowGroup(row_group_index); + int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values(); + + std::unique_ptr input( + new SingleRowGroupIterator(column_index, row_group_index, reader_.get())); + + std::unique_ptr impl( + new PrimitiveImpl(pool_, std::move(input))); + ColumnReader flat_column_reader(std::move(impl)); + + std::shared_ptr array; + RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array)); + *out = array; + return Status::OK(); +} + Status FileReader::Impl::ReadRowGroup(int row_group_index, const std::vector& indices, std::shared_ptr<::arrow::Table>* out) { @@ -408,17 +428,9 @@ Status FileReader::Impl::ReadRowGroup(int row_group_index, auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata, this](int i) { int column_index = indices[i]; - int64_t records_to_read = rg_metadata->ColumnChunk(column_index)->num_values(); - - std::unique_ptr input( - new SingleRowGroupIterator(column_index, row_group_index, reader_.get())); - - std::unique_ptr impl( - new PrimitiveImpl(pool_, std::move(input))); - ColumnReader flat_column_reader(std::move(impl)); std::shared_ptr array; - RETURN_NOT_OK(flat_column_reader.NextBatch(records_to_read, &array)); + RETURN_NOT_OK(ReadColumnChunk(column_index, row_group_index, &array)); columns[i] = std::make_shared(schema->field(i), array); return Status::OK(); }; @@ -561,6 +573,11 @@ Status FileReader::ReadRowGroup(int i, const std::vector& indices, } } +std::shared_ptr FileReader::RowGroup(int row_group_index) { + return std::shared_ptr( + new RowGroupReader(impl_.get(), row_group_index)); +} + int FileReader::num_row_groups() const { return impl_->num_row_groups(); } void FileReader::set_num_threads(int num_threads) { impl_->set_num_threads(num_threads); } @@ -1354,5 +1371,34 @@ Status StructImpl::NextBatch(int64_t records_to_read, std::shared_ptr* ou return Status::OK(); } +std::shared_ptr RowGroupReader::Column(int column_index) { + return std::shared_ptr( + new ColumnChunkReader(impl_, row_group_index_, column_index)); +} + +Status RowGroupReader::ReadTable(const std::vector& column_indices, + std::shared_ptr<::arrow::Table>* out) { + return impl_->ReadRowGroup(row_group_index_, column_indices, out); +} + +Status RowGroupReader::ReadTable(std::shared_ptr<::arrow::Table>* out) { + return impl_->ReadRowGroup(row_group_index_, out); +} + +RowGroupReader::~RowGroupReader() {} + +RowGroupReader::RowGroupReader(FileReader::Impl* impl, int row_group_index) + : impl_(impl), row_group_index_(row_group_index) {} + +Status ColumnChunkReader::Read(std::shared_ptr<::arrow::Array>* out) { + return impl_->ReadColumnChunk(column_index_, row_group_index_, out); +} + +ColumnChunkReader::~ColumnChunkReader() {} + +ColumnChunkReader::ColumnChunkReader(FileReader::Impl* impl, int row_group_index, + int column_index) + : impl_(impl), column_index_(column_index), row_group_index_(row_group_index) {} + } // namespace arrow } // namespace parquet diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h index faaef9a..95b2186 100644 --- a/src/parquet/arrow/reader.h +++ b/src/parquet/arrow/reader.h @@ -39,11 +39,27 @@ namespace parquet { namespace arrow { +class ColumnChunkReader; class ColumnReader; +class RowGroupReader; // Arrow read adapter class for deserializing Parquet files as Arrow row // batches. // +// This interfaces caters for different use cases and thus provides different +// interfaces. In its most simplistic form, we cater for a user that wants to +// read the whole Parquet at once with the FileReader::ReadTable method. +// +// More advanced users that also want to implement parallelism on top of each +// single Parquet files should do this on the RowGroup level. For this, they can +// call FileReader::RowGroup(i)->ReadTable to receive only the specified +// RowGroup as a table. +// +// In the most advanced situation, where a consumer wants to independently read +// RowGroups in parallel and consume each column individually, they can call +// FileReader::RowGroup(i)->Column(j)->Read and receive an arrow::Column +// instance. +// // TODO(wesm): nested data does not always make sense with this user // interface unless you are only reading a single leaf node from a branch of // a table. For example: @@ -150,6 +166,10 @@ class PARQUET_EXPORT FileReader { ::arrow::Status ScanContents(std::vector columns, const int32_t column_batch_size, int64_t* num_rows); + /// \brief Return a reader for the RowGroup, this object must not outlive the + /// FileReader. + std::shared_ptr RowGroup(int row_group_index); + int num_row_groups() const; const ParquetFileReader* parquet_reader() const; @@ -161,10 +181,46 @@ class PARQUET_EXPORT FileReader { virtual ~FileReader(); private: + friend ColumnChunkReader; + friend RowGroupReader; + class PARQUET_NO_EXPORT Impl; std::unique_ptr impl_; }; +class PARQUET_EXPORT RowGroupReader { + public: + std::shared_ptr Column(int column_index); + + ::arrow::Status ReadTable(const std::vector& column_indices, + std::shared_ptr<::arrow::Table>* out); + ::arrow::Status ReadTable(std::shared_ptr<::arrow::Table>* out); + + virtual ~RowGroupReader(); + + private: + friend FileReader; + RowGroupReader(FileReader::Impl* reader, int row_group_index); + + FileReader::Impl* impl_; + int row_group_index_; +}; + +class PARQUET_EXPORT ColumnChunkReader { + public: + ::arrow::Status Read(std::shared_ptr<::arrow::Array>* out); + + virtual ~ColumnChunkReader(); + + private: + friend RowGroupReader; + ColumnChunkReader(FileReader::Impl* impl, int row_group_index, int column_index); + + FileReader::Impl* impl_; + int column_index_; + int row_group_index_; +}; + // At this point, the column reader is a stream iterator. It only knows how to // read the next batch of values for a particular column from the file until it // runs out. diff --git a/src/parquet/file_reader.cc b/src/parquet/file_reader.cc index 72c71c6..7b74812 100644 --- a/src/parquet/file_reader.cc +++ b/src/parquet/file_reader.cc @@ -64,9 +64,9 @@ RowGroupReader::RowGroupReader(std::unique_ptr contents) : contents_(std::move(contents)) {} std::shared_ptr RowGroupReader::Column(int i) { - DCHECK(i < metadata()->num_columns()) << "The RowGroup only has " - << metadata()->num_columns() - << "columns, requested column: " << i; + DCHECK(i < metadata()->num_columns()) + << "The RowGroup only has " << metadata()->num_columns() + << "columns, requested column: " << i; const ColumnDescriptor* descr = metadata()->schema()->Column(i); std::unique_ptr page_reader = contents_->GetColumnPageReader(i); @@ -76,9 +76,9 @@ std::shared_ptr RowGroupReader::Column(int i) { } std::unique_ptr RowGroupReader::GetColumnPageReader(int i) { - DCHECK(i < metadata()->num_columns()) << "The RowGroup only has " - << metadata()->num_columns() - << "columns, requested column: " << i; + DCHECK(i < metadata()->num_columns()) + << "The RowGroup only has " << metadata()->num_columns() + << "columns, requested column: " << i; return contents_->GetColumnPageReader(i); } @@ -302,9 +302,9 @@ std::shared_ptr ParquetFileReader::metadata() const { } std::shared_ptr ParquetFileReader::RowGroup(int i) { - DCHECK(i < metadata()->num_row_groups()) << "The file only has " - << metadata()->num_row_groups() - << "row groups, requested reader for: " << i; + DCHECK(i < metadata()->num_row_groups()) + << "The file only has " << metadata()->num_row_groups() + << "row groups, requested reader for: " << i; return contents_->GetRowGroup(i); } -- To stop receiving notification emails like this one, please contact uwe@apache.org.