parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-835: Read Arrow columns in parallel with thread pool
Date Mon, 23 Jan 2017 17:55:13 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master fecdcbf69 -> 56fbdb63b


PARQUET-835: Read Arrow columns in parallel with thread pool

Also implements PARQUET-836, but need to add a unit test for that

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #222 from wesm/PARQUET-835 and squashes the following commits:

71c700e [Wes McKinney] Add missing include. Update Arrow version
638b4c0 [Wes McKinney] cpplint
7c79ca7 [Wes McKinney] Read Arrow columns in parallel with thread pool


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/56fbdb63
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/56fbdb63
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/56fbdb63

Branch: refs/heads/master
Commit: 56fbdb63b908b38d51965d213db5a6ec47ffa9ca
Parents: fecdcbf
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Mon Jan 23 12:55:05 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Mon Jan 23 12:55:05 2017 -0500

----------------------------------------------------------------------
 cmake_modules/ThirdpartyToolchain.cmake       |  2 +-
 src/parquet/arrow/arrow-reader-writer-test.cc | 49 +++++++++++-
 src/parquet/arrow/reader.cc                   | 90 ++++++++++++++++++++--
 src/parquet/arrow/reader.h                    | 12 +++
 src/parquet/arrow/schema.cc                   | 19 ++++-
 src/parquet/arrow/schema.h                    |  4 +
 6 files changed, 164 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/cmake_modules/ThirdpartyToolchain.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index 9a17dcf..8fc1b78 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -22,7 +22,7 @@ set(THRIFT_VERSION "0.9.1")
 
 # Brotli 0.5.2 does not install headers/libraries yet, but 0.6.0.dev does
 set(BROTLI_VERSION "5db62dcc9d386579609540cdf8869e95ad334bbd")
-set(ARROW_VERSION "7d3e2a3ab90324625b738e464a020758379f457a")
+set(ARROW_VERSION "085c8754b0ab2da7fcd245fc88bc4de9a6806a4c")
 
 # find boost headers and libs
 set(Boost_DEBUG TRUE)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 57986de..6748a8d 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -17,6 +17,8 @@
 
 #include "gtest/gtest.h"
 
+#include <sstream>
+
 #include "parquet/api/reader.h"
 #include "parquet/api/writer.h"
 
@@ -44,7 +46,6 @@ using parquet::schema::NodePtr;
 using parquet::schema::PrimitiveNode;
 
 namespace parquet {
-
 namespace arrow {
 
 const int SMALL_SIZE = 100;
@@ -184,6 +185,23 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
 template <typename T>
 using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
 
+void DoTableRoundtrip(
+    const std::shared_ptr<Table>& table, int num_threads, std::shared_ptr<Table>*
out) {
+  auto sink = std::make_shared<InMemoryOutputStream>();
+
+  ASSERT_OK_NO_THROW(WriteFlatTable(
+      table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
+
+  std::shared_ptr<Buffer> buffer = sink->GetBuffer();
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(
+      OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+          ::parquet::default_reader_properties(), nullptr, &reader));
+
+  reader->set_num_threads(num_threads);
+  ASSERT_OK_NO_THROW(reader->ReadFlatTable(out));
+}
+
 template <typename TestType>
 class TestParquetIO : public ::testing::Test {
  public:
@@ -642,6 +660,33 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead)
{
   this->CheckSingleColumnRequiredTableRead(4);
 }
 
-}  // namespace arrow
+TEST(TestArrowReadWrite, MultithreadedRead) {
+  const int num_columns = 20;
+  const int num_rows = 1000;
+  const int num_threads = 4;
+
+  std::shared_ptr<::arrow::Column> column;
+  std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns);
+  std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns);
+
+  std::shared_ptr<Array> values;
+  for (int i = 0; i < num_columns; ++i) {
+    ASSERT_OK(NullableArray<::arrow::DoubleType>(num_rows, num_rows / 10, &values));
+    std::stringstream ss;
+    ss << "col" << i;
+    column = MakeColumn(ss.str(), values, true);
+
+    columns[i] = column;
+    fields[i] = column->field();
+  }
+  auto schema = std::make_shared<::arrow::Schema>(fields);
+  auto table = std::make_shared<Table>("schema", schema, columns);
 
+  std::shared_ptr<Table> result;
+  DoTableRoundtrip(table, num_threads, &result);
+
+  ASSERT_TRUE(table->Equals(result));
+}
+
+}  // namespace arrow
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index c9f986a..9221041 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -18,9 +18,12 @@
 #include "parquet/arrow/reader.h"
 
 #include <algorithm>
+#include <atomic>
 #include <chrono>
+#include <mutex>
 #include <queue>
 #include <string>
+#include <thread>
 #include <vector>
 
 #include "parquet/arrow/schema.h"
@@ -65,11 +68,17 @@ class FileReader::Impl {
   Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
   Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
   Status ReadFlatTable(std::shared_ptr<Table>* out);
+  Status ReadFlatTable(
+      const std::vector<int>& column_indices, std::shared_ptr<Table>* out);
   const ParquetFileReader* parquet_reader() const { return reader_.get(); }
 
+  void set_num_threads(int num_threads) { num_threads_ = num_threads; }
+
  private:
   MemoryPool* pool_;
   std::unique_ptr<ParquetFileReader> reader_;
+
+  int num_threads_;
 };
 
 class FlatColumnReader::Impl {
@@ -125,7 +134,7 @@ class FlatColumnReader::Impl {
 };
 
 FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
-    : pool_(pool), reader_(std::move(reader)) {}
+    : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
 
 bool FileReader::Impl::CheckForFlatColumn(const ColumnDescriptor* descr) {
   if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() >
1)) {
@@ -156,19 +165,73 @@ Status FileReader::Impl::ReadFlatColumn(int i, std::shared_ptr<Array>*
out) {
 }
 
 Status FileReader::Impl::ReadFlatTable(std::shared_ptr<Table>* table) {
+  std::vector<int> column_indices(reader_->metadata()->num_columns());
+
+  for (size_t i = 0; i < column_indices.size(); ++i) {
+    column_indices[i] = i;
+  }
+  return ReadFlatTable(column_indices, table);
+}
+
+template <class FUNCTION>
+Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
+  std::vector<std::thread> thread_pool;
+  thread_pool.reserve(nthreads);
+  std::atomic<int> task_counter(0);
+
+  std::mutex error_mtx;
+  bool error_occurred = false;
+  Status error;
+
+  for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
+    thread_pool.emplace_back(
+        [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx,
&func]() {
+          int task_id;
+          while (!error_occurred) {
+            task_id = task_counter.fetch_add(1);
+            if (task_id >= num_tasks) { break; }
+            Status s = func(task_id);
+            if (!s.ok()) {
+              std::lock_guard<std::mutex> lock(error_mtx);
+              error_occurred = true;
+              error = s;
+              break;
+            }
+          }
+        });
+  }
+  for (auto&& thread : thread_pool) {
+    thread.join();
+  }
+  if (error_occurred) { return error; }
+  return Status::OK();
+}
+
+Status FileReader::Impl::ReadFlatTable(
+    const std::vector<int>& indices, std::shared_ptr<Table>* table) {
   auto descr = reader_->metadata()->schema();
 
   const std::string& name = descr->name();
   std::shared_ptr<::arrow::Schema> schema;
-  RETURN_NOT_OK(FromParquetSchema(descr, &schema));
-
-  int num_columns = reader_->metadata()->num_columns();
+  RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema));
 
+  int num_columns = static_cast<int>(indices.size());
+  int nthreads = std::min<int>(num_threads_, num_columns);
   std::vector<std::shared_ptr<Column>> columns(num_columns);
-  for (int i = 0; i < num_columns; i++) {
+
+  auto ReadColumn = [&indices, &schema, &columns, this](int i) {
     std::shared_ptr<Array> array;
-    RETURN_NOT_OK(ReadFlatColumn(i, &array));
-    columns[i] = std::make_shared<Column>(schema->field(i), array);
+    RETURN_NOT_OK(ReadFlatColumn(indices[i], &array));
+    columns[i] = std::make_shared<Column>(schema->field(indices[i]), array);
+    return Status::OK();
+  };
+
+  if (nthreads == 1) {
+    for (int i = 0; i < num_columns; i++) {
+      RETURN_NOT_OK(ReadColumn(i));
+    }
+  } else {
+    RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumn));
   }
 
   *table = std::make_shared<Table>(name, schema, columns);
@@ -218,6 +281,19 @@ Status FileReader::ReadFlatTable(std::shared_ptr<Table>* out) {
   }
 }
 
+Status FileReader::ReadFlatTable(
+    const std::vector<int>& column_indices, std::shared_ptr<Table>* out)
{
+  try {
+    return impl_->ReadFlatTable(column_indices, out);
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+}
+
+void FileReader::set_num_threads(int num_threads) {
+  impl_->set_num_threads(num_threads);
+}
+
 const ParquetFileReader* FileReader::parquet_reader() const {
   return impl_->parquet_reader();
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 518ae4b..934b826 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -19,6 +19,7 @@
 #define PARQUET_ARROW_READER_H
 
 #include <memory>
+#include <vector>
 
 #include "parquet/api/reader.h"
 #include "parquet/api/schema.h"
@@ -94,13 +95,24 @@ class PARQUET_EXPORT FileReader {
   //
   // Returns error status if the column of interest is not flat.
   ::arrow::Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
+
   // Read column as a whole into an Array.
   ::arrow::Status ReadFlatColumn(int i, std::shared_ptr<::arrow::Array>* out);
+
   // Read a table of flat columns into a Table.
   ::arrow::Status ReadFlatTable(std::shared_ptr<::arrow::Table>* out);
 
+  // Read a table of flat columns into a Table. Read only the indicated column
+  // indices (relative to the schema)
+  ::arrow::Status ReadFlatTable(
+      const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>*
out);
+
   const ParquetFileReader* parquet_reader() const;
 
+  /// Set the number of threads to use during reads of multiple columns. By
+  /// default only 1 thread is used
+  void set_num_threads(int num_threads);
+
   virtual ~FileReader();
 
  private:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index b086b9e..4f17f5e 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -250,8 +250,6 @@ Status NodeToField(const NodePtr& node, std::shared_ptr<Field>*
out) {
 
 Status FromParquetSchema(
     const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out)
{
-  // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
-  // from the root Parquet node
   const GroupNode* schema_node = parquet_schema->group_node();
 
   std::vector<std::shared_ptr<Field>> fields(schema_node->field_count());
@@ -263,6 +261,23 @@ Status FromParquetSchema(
   return Status::OK();
 }
 
+Status FromParquetSchema(const SchemaDescriptor* parquet_schema,
+    const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>*
out) {
+  // TODO(wesm): Consider adding an arrow::Schema name attribute, which comes
+  // from the root Parquet node
+  const GroupNode* schema_node = parquet_schema->group_node();
+
+  int num_fields = static_cast<int>(column_indices.size());
+
+  std::vector<std::shared_ptr<Field>> fields(num_fields);
+  for (int i = 0; i < num_fields; i++) {
+    RETURN_NOT_OK(NodeToField(schema_node->field(column_indices[i]), &fields[i]));
+  }
+
+  *out = std::make_shared<::arrow::Schema>(fields);
+  return Status::OK();
+}
+
 Status ListToNode(const std::shared_ptr<::arrow::ListType>& type, const std::string&
name,
     bool nullable, const WriterProperties& properties, NodePtr* out) {
   Repetition::type repetition = nullable ? Repetition::OPTIONAL : Repetition::REQUIRED;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/56fbdb63/src/parquet/arrow/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index 6917b90..bb77a4e 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -19,6 +19,7 @@
 #define PARQUET_ARROW_SCHEMA_H
 
 #include <memory>
+#include <vector>
 
 #include "arrow/schema.h"
 #include "arrow/type.h"
@@ -40,6 +41,9 @@ namespace arrow {
 ::arrow::Status PARQUET_EXPORT NodeToField(
     const schema::NodePtr& node, std::shared_ptr<::arrow::Field>* out);
 
+::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,
+    const std::vector<int>& column_indices, std::shared_ptr<::arrow::Schema>*
out);
+
 ::arrow::Status PARQUET_EXPORT FromParquetSchema(
     const SchemaDescriptor* parquet_schema, std::shared_ptr<::arrow::Schema>* out);
 


Mime
View raw message