parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-762: C++: Use optimistic allocation instead of Arrow Builders
Date Sun, 06 Nov 2016 19:35:07 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 86ebc2393 -> 5e9dc3a96


PARQUET-762: C++: Use optimistic allocation instead of Arrow Builders

About 2x as fast on non-string data.

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #183 from xhochy/PARQUET-762 and squashes the following commits:

d1d11b5 [Uwe L. Korn] PARQUET-762: C++: Use optimistic allocation instead of Arrow Builders


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/5e9dc3a9
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/5e9dc3a9
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/5e9dc3a9

Branch: refs/heads/master
Commit: 5e9dc3a9649adc179423dea6b5d8818525bc24cf
Parents: 86ebc23
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Sun Nov 6 14:35:00 2016 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sun Nov 6 14:35:00 2016 -0500

----------------------------------------------------------------------
 src/parquet/arrow/reader.cc | 179 ++++++++++++++++++++++++++-------------
 1 file changed, 121 insertions(+), 58 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e9dc3a9/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index bc8c62a..bc9ec8f 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -31,6 +31,7 @@
 #include "arrow/table.h"
 #include "arrow/types/primitive.h"
 #include "arrow/types/string.h"
+#include "arrow/util/bit-util.h"
 #include "arrow/util/status.h"
 
 using arrow::Array;
@@ -50,16 +51,16 @@ namespace arrow {
 
 template <typename ArrowType>
 struct ArrowTypeTraits {
-  typedef ::arrow::NumericBuilder<ArrowType> builder_type;
+  typedef ::arrow::NumericArray<ArrowType> array_type;
 };
 
 template <>
-struct ArrowTypeTraits<BooleanType> {
-  typedef ::arrow::BooleanBuilder builder_type;
+struct ArrowTypeTraits<::arrow::BooleanType> {
+  typedef ::arrow::BooleanArray array_type;
 };
 
 template <typename ArrowType>
-using BuilderType = typename ArrowTypeTraits<ArrowType>::builder_type;
+using ArrayType = typename ArrowTypeTraits<ArrowType>::array_type;
 
 class FileReader::Impl {
  public:
@@ -87,13 +88,13 @@ class FlatColumnReader::Impl {
   template <typename ArrowType, typename ParquetType>
   Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
 
+  template <typename ArrowType>
+  Status InitDataBuffer(int batch_size);
   template <typename ArrowType, typename ParquetType>
-  Status ReadNullableFlatBatch(const int16_t* def_levels,
-      typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
-      BuilderType<ArrowType>* builder);
+  void ReadNullableFlatBatch(const int16_t* def_levels,
+      typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read);
   template <typename ArrowType, typename ParquetType>
-  Status ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read,
-      BuilderType<ArrowType>* builder);
+  void ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read);
 
  private:
   void NextRowGroup();
@@ -106,26 +107,6 @@ class FlatColumnReader::Impl {
             (sizeof(InType) == sizeof(OutType)));
   };
 
-  template <typename InType, typename OutType,
-      typename std::enable_if<can_copy_ptr<InType, OutType>::value>::type* =
nullptr>
-  Status ConvertPhysicalType(
-      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
-    *out_ptr = reinterpret_cast<const OutType*>(in_ptr);
-    return Status::OK();
-  }
-
-  template <typename InType, typename OutType,
-      typename std::enable_if<not can_copy_ptr<InType, OutType>::value>::type*
= nullptr>
-  Status ConvertPhysicalType(
-      const InType* in_ptr, int64_t length, const OutType** out_ptr) {
-    RETURN_NOT_OK(values_builder_buffer_.Resize(length * sizeof(OutType)));
-    OutType* mutable_out_ptr =
-        reinterpret_cast<OutType*>(values_builder_buffer_.mutable_data());
-    std::copy(in_ptr, in_ptr + length, mutable_out_ptr);
-    *out_ptr = mutable_out_ptr;
-    return Status::OK();
-  }
-
   MemoryPool* pool_;
   const ColumnDescriptor* descr_;
   ParquetFileReader* reader_;
@@ -136,8 +117,12 @@ class FlatColumnReader::Impl {
 
   PoolBuffer values_buffer_;
   PoolBuffer def_levels_buffer_;
-  PoolBuffer values_builder_buffer_;
-  PoolBuffer valid_bytes_buffer_;
+  std::shared_ptr<PoolBuffer> data_buffer_;
+  uint8_t* data_buffer_ptr_;
+  std::shared_ptr<PoolBuffer> valid_bits_buffer_;
+  uint8_t* valid_bits_ptr_;
+  int64_t valid_bits_idx_;
+  int64_t null_count_;
 };
 
 FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
@@ -241,50 +226,97 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor*
descr,
 }
 
 template <typename ArrowType, typename ParquetType>
-Status FlatColumnReader::Impl::ReadNonNullableBatch(typename ParquetType::c_type* values,
-    int64_t values_read, BuilderType<ArrowType>* builder) {
+void FlatColumnReader::Impl::ReadNonNullableBatch(
+    typename ParquetType::c_type* values, int64_t values_read) {
   using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
 
-  DCHECK(builder);
-  const ArrowCType* values_ptr = nullptr;
-  RETURN_NOT_OK(
-      (ConvertPhysicalType<ParquetCType, ArrowCType>(values, values_read, &values_ptr)));
-  RETURN_NOT_OK(builder->Append(values_ptr, values_read));
-  return Status::OK();
+  ArrowCType* out_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
+  std::copy(values, values + values_read, out_ptr + valid_bits_idx_);
+  valid_bits_idx_ += values_read;
+}
+
+template <>
+void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
+    bool* values, int64_t values_read) {
+  for (int64_t i = 0; i < values_read; i++) {
+    if (values[i]) { ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_); }
+    valid_bits_idx_++;
+  }
 }
 
 template <typename ArrowType, typename ParquetType>
-Status FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
-    typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read,
-    BuilderType<ArrowType>* builder) {
+void FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
+    typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read) {
   using ArrowCType = typename ArrowType::c_type;
 
-  DCHECK(builder);
-  RETURN_NOT_OK(values_builder_buffer_.Resize(levels_read * sizeof(ArrowCType)));
-  RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
-  auto values_ptr = reinterpret_cast<ArrowCType*>(values_builder_buffer_.mutable_data());
-  uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
+  auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
   int values_idx = 0;
   for (int64_t i = 0; i < levels_read; i++) {
     if (def_levels[i] < descr_->max_definition_level()) {
-      valid_bytes[i] = 0;
+      null_count_++;
     } else {
-      valid_bytes[i] = 1;
-      values_ptr[i] = values[values_idx++];
+      ::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
+      data_ptr[valid_bits_idx_] = values[values_idx++];
     }
+    valid_bits_idx_++;
   }
-  RETURN_NOT_OK(builder->Append(values_ptr, levels_read, valid_bytes));
+}
+
+template <>
+void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
+    const int16_t* def_levels, bool* values, int64_t values_read, int64_t levels_read) {
+  int values_idx = 0;
+  for (int64_t i = 0; i < levels_read; i++) {
+    if (def_levels[i] < descr_->max_definition_level()) {
+      null_count_++;
+    } else {
+      ::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
+      if (values[values_idx++]) {
+        ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_);
+      }
+    }
+    valid_bits_idx_++;
+  }
+}
+
+template <typename ArrowType>
+Status FlatColumnReader::Impl::InitDataBuffer(int batch_size) {
+  using ArrowCType = typename ArrowType::c_type;
+  data_buffer_ = std::make_shared<PoolBuffer>(pool_);
+  RETURN_NOT_OK(data_buffer_->Resize(batch_size * sizeof(ArrowCType)));
+  data_buffer_ptr_ = data_buffer_->mutable_data();
+
+  return Status::OK();
+}
+
+template <>
+Status FlatColumnReader::Impl::InitDataBuffer<::arrow::BooleanType>(int batch_size)
{
+  data_buffer_ = std::make_shared<PoolBuffer>(pool_);
+  RETURN_NOT_OK(data_buffer_->Resize(::arrow::BitUtil::CeilByte(batch_size) / 8));
+  data_buffer_ptr_ = data_buffer_->mutable_data();
+  memset(data_buffer_ptr_, 0, data_buffer_->size());
+
   return Status::OK();
 }
 
 template <typename ArrowType, typename ParquetType>
 Status FlatColumnReader::Impl::TypedReadBatch(
     int batch_size, std::shared_ptr<Array>* out) {
+  using ArrowCType = typename ArrowType::c_type;
   using ParquetCType = typename ParquetType::c_type;
 
   int values_to_read = batch_size;
-  BuilderType<ArrowType> builder(pool_, field_->type);
+  RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size));
+  valid_bits_idx_ = 0;
+  if (descr_->max_definition_level() > 0) {
+    valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
+    int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8;
+    valid_bits_buffer_->Resize(valid_bits_size);
+    valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
+    memset(valid_bits_ptr_, 0, valid_bits_size);
+    null_count_ = 0;
+  }
+
   while ((values_to_read > 0) && column_reader_) {
     values_buffer_.Resize(values_to_read * sizeof(ParquetCType));
     if (descr_->max_definition_level() > 0) {
@@ -299,17 +331,48 @@ Status FlatColumnReader::Impl::TypedReadBatch(
                              values_to_read, def_levels, nullptr, values, &values_read));
     values_to_read -= levels_read;
     if (descr_->max_definition_level() == 0) {
-      RETURN_NOT_OK(
-          (ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read, &builder)));
+      ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read);
     } else {
       // As per the defintion and checks for flat columns:
       // descr_->max_definition_level() == 1
-      RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
-          def_levels, values, values_read, levels_read, &builder)));
+      ReadNullableFlatBatch<ArrowType, ParquetType>(
+          def_levels, values, values_read, levels_read);
     }
     if (!column_reader_->HasNext()) { NextRowGroup(); }
   }
-  return builder.Finish(out);
+
+  if (descr_->max_definition_level() > 0) {
+    // TODO: Shrink arrays in the case they are too large
+    if (valid_bits_idx_ < batch_size * 0.8) {
+      // Shrink arrays as they are larger than the output.
+      // TODO(PARQUET-761/ARROW-360): Use realloc internally to shrink the arrays
+      //    without the need for a copy. Given a decent underlying allocator this
+      //    should still free some underlying pages to the OS.
+
+      auto data_buffer = std::make_shared<PoolBuffer>(pool_);
+      RETURN_NOT_OK(data_buffer->Resize(valid_bits_idx_ * sizeof(ArrowCType)));
+      memcpy(data_buffer->mutable_data(), data_buffer_->data(), data_buffer->size());
+      data_buffer_ = data_buffer;
+
+      auto valid_bits_buffer = std::make_shared<PoolBuffer>(pool_);
+      RETURN_NOT_OK(
+          valid_bits_buffer->Resize(::arrow::BitUtil::CeilByte(valid_bits_idx_) / 8));
+      memcpy(valid_bits_buffer->mutable_data(), valid_bits_buffer_->data(),
+          valid_bits_buffer->size());
+      valid_bits_buffer_ = valid_bits_buffer;
+    }
+    *out = std::make_shared<ArrayType<ArrowType>>(
+        field_->type, valid_bits_idx_, data_buffer_, null_count_, valid_bits_buffer_);
+    // Relase the ownership
+    data_buffer_.reset();
+    valid_bits_buffer_.reset();
+    return Status::OK();
+  } else {
+    *out = std::make_shared<ArrayType<ArrowType>>(
+        field_->type, valid_bits_idx_, data_buffer_);
+    data_buffer_.reset();
+    return Status::OK();
+  }
 }
 
 template <>


Mime
View raw message