Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 7878E200C05 for ; Mon, 23 Jan 2017 18:55:15 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 7716E160B49; Mon, 23 Jan 2017 17:55:15 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 7504E160B3C for ; Mon, 23 Jan 2017 18:55:14 +0100 (CET) Received: (qmail 87997 invoked by uid 500); 23 Jan 2017 17:55:13 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 87988 invoked by uid 99); 23 Jan 2017 17:55:13 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Mon, 23 Jan 2017 17:55:13 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 8AD74DFBE6; Mon, 23 Jan 2017 17:55:13 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@parquet.apache.org Message-Id: <0f04544744a2424c9d032489995ad236@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: parquet-cpp git commit: PARQUET-835: Read Arrow columns in parallel with thread pool Date: Mon, 23 Jan 2017 17:55:13 +0000 (UTC) archived-at: Mon, 23 Jan 2017 17:55:15 -0000 Repository: parquet-cpp Updated Branches: refs/heads/master fecdcbf69 -> 56fbdb63b PARQUET-835: Read Arrow columns in parallel with thread pool Also implements PARQUET-836, but need to add a unit test for that Author: Wes McKinney Closes #222 from wesm/PARQUET-835 and squashes the following commits: 71c700e [Wes McKinney] Add missing include. Update Arrow version 638b4c0 [Wes McKinney] cpplint 7c79ca7 [Wes McKinney] Read Arrow columns in parallel with thread pool Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/56fbdb63 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/56fbdb63 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/56fbdb63 Branch: refs/heads/master Commit: 56fbdb63b908b38d51965d213db5a6ec47ffa9ca Parents: fecdcbf Author: Wes McKinney Authored: Mon Jan 23 12:55:05 2017 -0500 Committer: Wes McKinney Committed: Mon Jan 23 12:55:05 2017 -0500 ---------------------------------------------------------------------- cmake_modules/ThirdpartyToolchain.cmake | 2 +- src/parquet/arrow/arrow-reader-writer-test.cc | 49 +++++++++++- src/parquet/arrow/reader.cc | 90 ++++++++++++++++++++-- src/parquet/arrow/reader.h | 12 +++ src/parquet/arrow/schema.cc | 19 ++++- src/parquet/arrow/schema.h | 4 + 6 files changed, 164 insertions(+), 12 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/cmake_modules/ThirdpartyToolchain.cmake ---------------------------------------------------------------------- diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake index 9a17dcf..8fc1b78 100644 --- a/cmake_modules/ThirdpartyToolchain.cmake +++ b/cmake_modules/ThirdpartyToolchain.cmake @@ -22,7 +22,7 @@ set(THRIFT_VERSION "0.9.1") # Brotli 0.5.2 does not install headers/libraries yet, but 0.6.0.dev does set(BROTLI_VERSION "5db62dcc9d386579609540cdf8869e95ad334bbd") -set(ARROW_VERSION "7d3e2a3ab90324625b738e464a020758379f457a") +set(ARROW_VERSION "085c8754b0ab2da7fcd245fc88bc4de9a6806a4c") # find boost headers and libs set(Boost_DEBUG TRUE) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/arrow-reader-writer-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index 57986de..6748a8d 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -17,6 +17,8 @@ #include "gtest/gtest.h" +#include + #include "parquet/api/reader.h" #include "parquet/api/writer.h" @@ -44,7 +46,6 @@ using parquet::schema::NodePtr; using parquet::schema::PrimitiveNode; namespace parquet { - namespace arrow { const int SMALL_SIZE = 100; @@ -184,6 +185,23 @@ using ParquetDataType = DataType::parquet_enum>; template using ParquetWriter = TypedColumnWriter>; +void DoTableRoundtrip( + const std::shared_ptr& table, int num_threads, std::shared_ptr
* out) { + auto sink = std::make_shared(); + + ASSERT_OK_NO_THROW(WriteFlatTable( + table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2)); + + std::shared_ptr buffer = sink->GetBuffer(); + std::unique_ptr reader; + ASSERT_OK_NO_THROW( + OpenFile(std::make_shared(buffer), ::arrow::default_memory_pool(), + ::parquet::default_reader_properties(), nullptr, &reader)); + + reader->set_num_threads(num_threads); + ASSERT_OK_NO_THROW(reader->ReadFlatTable(out)); +} + template class TestParquetIO : public ::testing::Test { public: @@ -642,6 +660,33 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) { this->CheckSingleColumnRequiredTableRead(4); } -} // namespace arrow +TEST(TestArrowReadWrite, MultithreadedRead) { + const int num_columns = 20; + const int num_rows = 1000; + const int num_threads = 4; + + std::shared_ptr<::arrow::Column> column; + std::vector> columns(num_columns); + std::vector> fields(num_columns); + + std::shared_ptr values; + for (int i = 0; i < num_columns; ++i) { + ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, &values)); + std::stringstream ss; + ss << "col" << i; + column = MakeColumn(ss.str(), values, true); + + columns[i] = column; + fields[i] = column->field(); + } + auto schema = std::make_shared<::arrow::Schema>(fields); + auto table = std::make_shared
("schema", schema, columns); + std::shared_ptr
result; + DoTableRoundtrip(table, num_threads, &result); + + ASSERT_TRUE(table->Equals(result)); +} + +} // namespace arrow } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc index c9f986a..9221041 100644 --- a/src/parquet/arrow/reader.cc +++ b/src/parquet/arrow/reader.cc @@ -18,9 +18,12 @@ #include "parquet/arrow/reader.h" #include +#include #include +#include #include #include +#include #include #include "parquet/arrow/schema.h" @@ -65,11 +68,17 @@ class FileReader::Impl { Status GetFlatColumn(int i, std::unique_ptr* out); Status ReadFlatColumn(int i, std::shared_ptr* out); Status ReadFlatTable(std::shared_ptr
* out); + Status ReadFlatTable( + const std::vector& column_indices, std::shared_ptr
* out); const ParquetFileReader* parquet_reader() const { return reader_.get(); } + void set_num_threads(int num_threads) { num_threads_ = num_threads; } + private: MemoryPool* pool_; std::unique_ptr reader_; + + int num_threads_; }; class FlatColumnReader::Impl { @@ -125,7 +134,7 @@ class FlatColumnReader::Impl { }; FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr reader) - : pool_(pool), reader_(std::move(reader)) {} + : pool_(pool), reader_(std::move(reader)), num_threads_(1) {} bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) { if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() > 1)) { @@ -156,19 +165,73 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr* out) { } Status FileReader::Impl::ReadFlatTable(std::shared_ptr
* table) { + std::vector column_indices(reader_->metadata()->num_columns()); + + for (size_t i = 0; i < column_indices.size(); ++i) { + column_indices[i] = i; + } + return ReadFlatTable(column_indices, table); +} + +template +Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) { + std::vector thread_pool; + thread_pool.reserve(nthreads); + std::atomic task_counter(0); + + std::mutex error_mtx; + bool error_occurred = false; + Status error; + + for (int thread_id = 0; thread_id < nthreads; ++thread_id) { + thread_pool.emplace_back( + [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() { + int task_id; + while (!error_occurred) { + task_id = task_counter.fetch_add(1); + if (task_id >= num_tasks) { break; } + Status s = func(task_id); + if (!s.ok()) { + std::lock_guard lock(error_mtx); + error_occurred = true; + error = s; + break; + } + } + }); + } + for (auto&& thread : thread_pool) { + thread.join(); + } + if (error_occurred) { return error; } + return Status::OK(); +} + +Status FileReader::Impl::ReadFlatTable( + const std::vector& indices, std::shared_ptr
* table) { auto descr = reader_->metadata()->schema(); const std::string& name = descr->name(); std::shared_ptr<::arrow::Schema> schema; - RETURN_NOT_OK(FromParquetSchema(descr, &schema)); - - int num_columns = reader_->metadata()->num_columns(); + RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema)); + int num_columns = static_cast(indices.size()); + int nthreads = std::min(num_threads_, num_columns); std::vector> columns(num_columns); - for (int i = 0; i < num_columns; i++) { + + auto ReadColumn = [&indices, &schema, &columns, this](int i) { std::shared_ptr array; - RETURN_NOT_OK(ReadFlatColumn(i, &array)); - columns[i] = std::make_shared(schema->field(i), array); + RETURN_NOT_OK(ReadFlatColumn(indices[i], &array)); + columns[i] = std::make_shared(schema->field(indices[i]), array); + return Status::OK(); + }; + + if (nthreads == 1) { + for (int i = 0; i < num_columns; i++) { + RETURN_NOT_OK(ReadColumn(i)); + } + } else { + RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumn)); } *table = std::make_shared
(name, schema, columns); @@ -218,6 +281,19 @@ Status FileReader::ReadFlatTable(std::shared_ptr
* out) { } } +Status FileReader::ReadFlatTable( + const std::vector& column_indices, std::shared_ptr
* out) { + try { + return impl_->ReadFlatTable(column_indices, out); + } catch (const ::parquet::ParquetException& e) { + return ::arrow::Status::IOError(e.what()); + } +} + +void FileReader::set_num_threads(int num_threads) { + impl_->set_num_threads(num_threads); +} + const ParquetFileReader* FileReader::parquet_reader() const { return impl_->parquet_reader(); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h index 518ae4b..934b826 100644 --- a/src/parquet/arrow/reader.h +++ b/src/parquet/arrow/reader.h @@ -19,6 +19,7 @@ #define PARQUET_ARROW_READER_H #include +#include #include "parquet/api/reader.h" #include "parquet/api/schema.h" @@ -94,13 +95,24 @@ class PARQUET_EXPORT FileReader { // // Returns error status if the column of interest is not flat. ::arrow::Status GetFlatColumn(int i, std::unique_ptr* out); + // Read column as a whole into an Array. ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out); + // Read a table of flat columns into a Table. ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out); + // Read a table of flat columns into a Table. Read only the indicated column + // indices (relative to the schema) + ::arrow::Status ReadFlatTable( + const std::vector& column_indices, std::shared_ptr<::arrow::Table>* out); + const ParquetFileReader* parquet_reader() const; + /// Set the number of threads to use during reads of multiple columns. By + /// default only 1 thread is used + void set_num_threads(int num_threads); + virtual ~FileReader(); private: http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/schema.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc index b086b9e..4f17f5e 100644 --- a/src/parquet/arrow/schema.cc +++ b/src/parquet/arrow/schema.cc @@ -250,8 +250,6 @@ Status NodeToField(const NodePtr& node, std::shared_ptr* out) { Status FromParquetSchema( const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out) { - // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes - // from the root Parquet node const GroupNode* schema_node = parquet_schema->group_node(); std::vector> fields(schema_node->field_count()); @@ -263,6 +261,23 @@ Status FromParquetSchema( return Status::OK(); } +Status FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::vector& column_indices, std::shared_ptr<::arrow::Schema>* out) { + // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes + // from the root Parquet node + const GroupNode* schema_node = parquet_schema->group_node(); + + int num_fields = static_cast(column_indices.size()); + + std::vector> fields(num_fields); + for (int i = 0; i < num_fields; i++) { + RETURN_NOT_OK(NodeToField(schema_node->field(column_indices[i]), &fields[i])); + } + + *out = std::make_shared<::arrow::Schema>(fields); + return Status::OK(); +} + Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string& name, bool nullable, const WriterProperties& properties, NodePtr* out) { Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/schema.h ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h index 6917b90..bb77a4e 100644 --- a/src/parquet/arrow/schema.h +++ b/src/parquet/arrow/schema.h @@ -19,6 +19,7 @@ #define PARQUET_ARROW_SCHEMA_H #include +#include #include "arrow/schema.h" #include "arrow/type.h" @@ -40,6 +41,9 @@ namespace arrow { ::arrow::Status PARQUET_EXPORT NodeToField( const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out); +::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema, + const std::vector& column_indices, std::shared_ptr<::arrow::Schema>* out); + ::arrow::Status PARQUET_EXPORT FromParquetSchema( const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);