parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-833: C++: Provide API to write spaced arrays
Date Mon, 23 Jan 2017 01:08:00 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 65e7db196 -> 8f0a51d42


PARQUET-833: C++: Provide API to write spaced arrays

Slight performance improvement. Not so much visible as in the read path as the write performance
is dominated by the dictionary appends.

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #220 from xhochy/PARQUET-833 and squashes the following commits:

12514cf [Uwe L. Korn] ninja format
d22477d [Uwe L. Korn] Call correct statistics update
25c9747 [Uwe L. Korn] Add more unittests, remove logging from public header
0c5ce71 [Uwe L. Korn] Save the memory copy for numerical types
1ed0407 [Uwe L. Korn] PARQUET-833: C++: Provide API to write spaced arrays


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/8f0a51d4
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/8f0a51d4
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/8f0a51d4

Branch: refs/heads/master
Commit: 8f0a51d428a625bb4f204766ba2ff415dd48a879
Parents: 65e7db1
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Sun Jan 22 20:07:53 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sun Jan 22 20:07:53 2017 -0500

----------------------------------------------------------------------
 src/parquet/arrow/writer.cc                 | 61 +++++++++++++++---
 src/parquet/column/column-writer-test.cc    | 30 ++++++++-
 src/parquet/column/statistics-test.cc       | 12 ++++
 src/parquet/column/statistics.cc            | 44 +++++++++++++
 src/parquet/column/statistics.h             |  2 +
 src/parquet/column/writer.cc                | 82 ++++++++++++++++++++++++
 src/parquet/column/writer.h                 | 12 ++++
 src/parquet/encodings/dictionary-encoding.h | 10 +++
 src/parquet/encodings/encoder.h             | 14 ++++
 src/parquet/encodings/encoding-test.cc      | 10 ++-
 src/parquet/util/bit-util.h                 | 12 ++--
 11 files changed, 269 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index f9087ff..993ff67 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <vector>
 
+#include "parquet/util/bit-util.h"
 #include "parquet/util/logging.h"
 
 #include "parquet/arrow/schema.h"
@@ -51,6 +52,11 @@ class FileWriter::Impl {
   Status TypedWriteBatch(
       ColumnWriter* writer, const PrimitiveArray* data, int64_t offset, int64_t length);
 
+  template <typename ParquetType, typename ArrowType>
+  Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t length,
+      const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+      int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr);
+
   // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
   // buffer
   template <typename InType, typename OutType>
@@ -136,19 +142,18 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
       PARQUET_CATCH_NOT_OK(
           writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr));
     } else {
-      RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
-      auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
-      int buffer_idx = 0;
+      const uint8_t* valid_bits = data->null_bitmap_data();
+      INIT_BITSET(valid_bits, offset);
       for (int i = 0; i < length; i++) {
-        if (data->IsNull(offset + i)) {
-          def_levels_ptr[i] = 0;
-        } else {
+        if (bitset & (1 << bit_offset)) {
           def_levels_ptr[i] = 1;
-          buffer_ptr[buffer_idx++] = static_cast<ParquetCType>(data_ptr[i]);
+        } else {
+          def_levels_ptr[i] = 0;
         }
+        READ_NEXT_BITSET(valid_bits);
       }
-      PARQUET_CATCH_NOT_OK(
-          writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+      RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
+          writer, length, def_levels_ptr, nullptr, valid_bits, offset, data_ptr)));
     }
   } else {
     return Status::NotImplemented("no support for max definition level > 1 yet");
@@ -157,6 +162,44 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
   return Status::OK();
 }
 
+template <typename ParquetType, typename ArrowType>
+Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
+    int64_t length, const int16_t* def_levels, const int16_t* rep_levels,
+    const uint8_t* valid_bits, int64_t valid_bits_offset,
+    const typename ArrowType::c_type* data_ptr) {
+  using ParquetCType = typename ParquetType::c_type;
+
+  RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
+  auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
+  INIT_BITSET(valid_bits, valid_bits_offset);
+  for (int i = 0; i < length; i++) {
+    if (bitset & (1 << bit_offset)) {
+      buffer_ptr[i] = static_cast<ParquetCType>(data_ptr[i]);
+    }
+    READ_NEXT_BITSET(valid_bits);
+  }
+  PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
+      length, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+
+  return Status::OK();
+}
+
+#define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                        \
+  template <>                                                                     
    \
+  Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>(            
    \
+      TypedColumnWriter<ParquetType> * writer, int64_t length,                    
    \
+      const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \
+      int64_t valid_bits_offset, const CType* data_ptr) {                              \
+    PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(                                   
 \
+        length, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr));     \
+    return Status::OK();                                                               \
+  }
+
+NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
+NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
+NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
+
 // This specialization seems quite similar but it significantly differs in two points:
 // * offset is added at the most latest time to the pointer as we have sub-byte access
 // * Arrow data is stored bitwise thus we cannot use std::copy to transform from

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index aac1a94..446811e 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -108,7 +108,17 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType>
{
       bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE) {
     this->GenerateData(num_rows);
 
-    // Test case 1: required and non-repeated, so no definition or repetition levels
+    this->WriteRequiredWithSettings(
+        encoding, compression, enable_dictionary, enable_statistics, num_rows);
+    this->ReadAndCompare(compression, num_rows);
+
+    this->WriteRequiredWithSettingsSpaced(
+        encoding, compression, enable_dictionary, enable_statistics, num_rows);
+    this->ReadAndCompare(compression, num_rows);
+  }
+
+  void WriteRequiredWithSettings(Encoding::type encoding, Compression::type compression,
+      bool enable_dictionary, bool enable_statistics, int64_t num_rows) {
     ColumnProperties column_properties(
         encoding, compression, enable_dictionary, enable_statistics);
     std::shared_ptr<TypedColumnWriter<TestType>> writer =
@@ -117,7 +127,25 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType>
{
     // The behaviour should be independent from the number of Close() calls
     writer->Close();
     writer->Close();
+  }
+
+  void WriteRequiredWithSettingsSpaced(Encoding::type encoding,
+      Compression::type compression, bool enable_dictionary, bool enable_statistics,
+      int64_t num_rows) {
+    std::vector<uint8_t> valid_bits(
+        BitUtil::RoundUpNumBytes(this->values_.size()) + 1, 255);
+    ColumnProperties column_properties(
+        encoding, compression, enable_dictionary, enable_statistics);
+    std::shared_ptr<TypedColumnWriter<TestType>> writer =
+        this->BuildWriter(num_rows, column_properties);
+    writer->WriteBatchSpaced(
+        this->values_.size(), nullptr, nullptr, valid_bits.data(), 0, this->values_ptr_);
+    // The behaviour should be independent from the number of Close() calls
+    writer->Close();
+    writer->Close();
+  }
 
+  void ReadAndCompare(Compression::type compression, int64_t num_rows) {
     this->SetupValuesOut(num_rows);
     this->ReadColumnFully(compression);
     Compare<T> compare(this->descr_);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/column/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc
index c8641a1..38cdc23 100644
--- a/src/parquet/column/statistics-test.cc
+++ b/src/parquet/column/statistics-test.cc
@@ -66,10 +66,22 @@ class TestRowGroupStatistics : public PrimitiveTypedTest<TestType>
{
     TypedStats statistics2(this->schema_.Column(0), encoded_min, encoded_max,
         this->values_.size(), 0, 0, true);
 
+    TypedStats statistics3(this->schema_.Column(0));
+    std::vector<uint8_t> valid_bits(
+        BitUtil::RoundUpNumBytes(this->values_.size()) + 1, 255);
+    statistics3.UpdateSpaced(
+        this->values_ptr_, valid_bits.data(), 0, this->values_.size(), 0);
+    std::string encoded_min_spaced = statistics3.EncodeMin();
+    std::string encoded_max_spaced = statistics3.EncodeMax();
+
     ASSERT_EQ(encoded_min, statistics2.EncodeMin());
     ASSERT_EQ(encoded_max, statistics2.EncodeMax());
     ASSERT_EQ(statistics1.min(), statistics2.min());
     ASSERT_EQ(statistics1.max(), statistics2.max());
+    ASSERT_EQ(encoded_min_spaced, statistics2.EncodeMin());
+    ASSERT_EQ(encoded_max_spaced, statistics2.EncodeMax());
+    ASSERT_EQ(statistics3.min(), statistics2.min());
+    ASSERT_EQ(statistics3.max(), statistics2.max());
   }
 
   void TestReset() {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/column/statistics.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc
index 9b76fab..df7a308 100644
--- a/src/parquet/column/statistics.cc
+++ b/src/parquet/column/statistics.cc
@@ -106,6 +106,50 @@ void TypedRowGroupStatistics<DType>::Update(
 }
 
 template <typename DType>
+void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
+    const uint8_t* valid_bits, int64_t valid_bits_offset, int64_t num_not_null,
+    int64_t num_null) {
+  DCHECK(num_not_null >= 0);
+  DCHECK(num_null >= 0);
+
+  IncrementNullCount(num_null);
+  IncrementNumValues(num_not_null);
+  // TODO: support distinct count?
+  if (num_not_null == 0) return;
+
+  Compare<T> compare(descr_);
+  INIT_BITSET(valid_bits, valid_bits_offset);
+  // Find first valid entry and use that for min/max
+  // As (num_not_null != 0) there must be one
+  int64_t length = num_null + num_not_null;
+  int64_t i = 0;
+  for (; i < length; i++) {
+    if (bitset & (1 << bit_offset)) { break; }
+    READ_NEXT_BITSET(valid_bits);
+  }
+  T min = values[i];
+  T max = values[i];
+  for (; i < length; i++) {
+    if (bitset & (1 << bit_offset)) {
+      if (compare(values[i], min)) {
+        min = values[i];
+      } else if (compare(max, values[i])) {
+        max = values[i];
+      }
+    }
+    READ_NEXT_BITSET(valid_bits);
+  }
+  if (!has_min_max_) {
+    has_min_max_ = true;
+    Copy(min, &min_, min_buffer_.get());
+    Copy(max, &max_, max_buffer_.get());
+  } else {
+    Copy(std::min(min_, min, compare), &min_, min_buffer_.get());
+    Copy(std::max(max_, max, compare), &max_, max_buffer_.get());
+  }
+}
+
+template <typename DType>
 const typename DType::c_type& TypedRowGroupStatistics<DType>::min() const {
   return min_;
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/column/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h
index cf41dc0..68bd4c8 100644
--- a/src/parquet/column/statistics.h
+++ b/src/parquet/column/statistics.h
@@ -149,6 +149,8 @@ class TypedRowGroupStatistics : public RowGroupStatistics {
   void Merge(const TypedRowGroupStatistics<DType>& other);
 
   void Update(const T* values, int64_t num_not_null, int64_t num_null);
+  void UpdateSpaced(const T* values, const uint8_t* valid_bits, int64_t valid_bits_spaced,
+      int64_t num_not_null, int64_t num_null);
 
   const T& min() const;
   const T& max() const;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 7319d46..406ded1 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -360,6 +360,59 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t
num_values,
 }
 
 template <typename DType>
+inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const T* values) {
+  int64_t values_to_write = 0;
+  // If the field is required and non-repeated, there are no definition levels
+  if (descr_->max_definition_level() > 0) {
+    for (int64_t i = 0; i < num_values; ++i) {
+      if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
+    }
+
+    WriteDefinitionLevels(num_values, def_levels);
+  } else {
+    // Required field, write all values
+    values_to_write = num_values;
+  }
+
+  // Not present for non-repeated fields
+  if (descr_->max_repetition_level() > 0) {
+    // A row could include more than one value
+    // Count the occasions where we start a new row
+    for (int64_t i = 0; i < num_values; ++i) {
+      if (rep_levels[i] == 0) { num_rows_++; }
+    }
+
+    WriteRepetitionLevels(num_values, rep_levels);
+  } else {
+    // Each value is exactly one row
+    num_rows_ += num_values;
+  }
+
+  if (num_rows_ > expected_rows_) {
+    throw ParquetException("More rows were written in the column chunk than expected");
+  }
+
+  WriteValuesSpaced(num_values, valid_bits, valid_bits_offset, values);
+
+  if (page_statistics_ != nullptr) {
+    page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write,
+        num_values - values_to_write);
+  }
+
+  num_buffered_values_ += num_values;
+  num_buffered_encoded_values_ += values_to_write;
+
+  if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize())
{
+    AddDataPage();
+  }
+  if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
+
+  return values_to_write;
+}
+
+template <typename DType>
 void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
     const int16_t* rep_levels, const T* values) {
   // We check for DataPage limits only after we have inserted the values. If a user
@@ -384,10 +437,39 @@ void TypedColumnWriter<DType>::WriteBatch(int64_t num_values,
const int16_t* def
 }
 
 template <typename DType>
+void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values,
+    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
+    int64_t valid_bits_offset, const T* values) {
+  // We check for DataPage limits only after we have inserted the values. If a user
+  // writes a large number of values, the DataPage size can be much above the limit.
+  // The purpose of this chunking is to bound this. Even if a user writes large number
+  // of values, the chunking will ensure the AddDataPage() is called at a reasonable
+  // pagesize limit
+  int64_t write_batch_size = properties_->write_batch_size();
+  int num_batches = num_values / write_batch_size;
+  int64_t num_remaining = num_values % write_batch_size;
+  for (int round = 0; round < num_batches; round++) {
+    int64_t offset = round * write_batch_size;
+    WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset],
+        valid_bits, valid_bits_offset + offset, &values[offset]);
+  }
+  // Write the remaining values
+  int64_t offset = num_batches * write_batch_size;
+  WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset],
+      valid_bits, valid_bits_offset + offset, &values[offset]);
+}
+
+template <typename DType>
 void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
   current_encoder_->Put(values, num_values);
 }
 
+template <typename DType>
+void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
+    const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
+  current_encoder_->PutSpaced(values, num_values, valid_bits, valid_bits_offset);
+}
+
 template class TypedColumnWriter<BooleanType>;
 template class TypedColumnWriter<Int32Type>;
 template class TypedColumnWriter<Int64Type>;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index 39d5934..b52b932 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -159,6 +159,12 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
   void WriteBatch(int64_t num_values, const int16_t* def_levels,
       const int16_t* rep_levels, const T* values);
 
+  // Write a batch of repetition levels, definition levels, and values to the
+  // column.
+  void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
+      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+      const T* values);
+
  protected:
   std::shared_ptr<Buffer> GetValuesBuffer() override {
     return current_encoder_->FlushValues();
@@ -173,10 +179,16 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
   int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
       const int16_t* rep_levels, const T* values);
 
+  int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
+      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
+      const T* values);
+
   typedef Encoder<DType> EncoderType;
 
   // Write values to a temporary buffer before they are encoded into pages
   void WriteValues(int64_t num_values, const T* values);
+  void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
+      int64_t valid_bits_offset, const T* values);
   std::unique_ptr<EncoderType> current_encoder_;
 
   typedef TypedRowGroupStatistics<DType> TypedStats;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 191bded..d465300 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -27,6 +27,7 @@
 #include "parquet/encodings/decoder.h"
 #include "parquet/encodings/encoder.h"
 #include "parquet/encodings/plain-encoding.h"
+#include "parquet/util/bit-util.h"
 #include "parquet/util/cpu-info.h"
 #include "parquet/util/hash-util.h"
 #include "parquet/util/memory.h"
@@ -238,6 +239,15 @@ class DictEncoder : public Encoder<DType> {
     }
   }
 
+  void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+      int64_t valid_bits_offset) override {
+    INIT_BITSET(valid_bits, valid_bits_offset);
+    for (int32_t i = 0; i < num_values; i++) {
+      if (bitset & (1 << bit_offset)) { Put(src[i]); }
+      READ_NEXT_BITSET(valid_bits);
+    }
+  }
+
   /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
   /// dict_encoded_size() bytes.
   void WriteDict(uint8_t* buffer);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index c51f8d5..35a36d3 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -23,6 +23,7 @@
 
 #include "parquet/exception.h"
 #include "parquet/types.h"
+#include "parquet/util/bit-util.h"
 #include "parquet/util/memory.h"
 
 namespace parquet {
@@ -43,6 +44,19 @@ class Encoder {
   virtual int64_t EstimatedDataEncodedSize() = 0;
   virtual std::shared_ptr<Buffer> FlushValues() = 0;
   virtual void Put(const T* src, int num_values) = 0;
+  virtual void PutSpaced(const T* src, int num_values, const uint8_t* valid_bits,
+      int64_t valid_bits_offset) {
+    PoolBuffer buffer(allocator_);
+    buffer.Resize(num_values * sizeof(T));
+    int32_t num_valid_values = 0;
+    INIT_BITSET(valid_bits, valid_bits_offset);
+    T* data = reinterpret_cast<T*>(buffer.mutable_data());
+    for (int32_t i = 0; i < num_values; i++) {
+      if (bitset & (1 << bit_offset)) { data[num_valid_values++] = src[i]; }
+      READ_NEXT_BITSET(valid_bits);
+    }
+    Put(data, num_valid_values);
+  }
 
   const Encoding::type encoding() const { return encoding_; }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
index 999cad9..914904e 100644
--- a/src/parquet/encodings/encoding-test.cc
+++ b/src/parquet/encodings/encoding-test.cc
@@ -246,15 +246,20 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
   static constexpr int TYPE = Type::type_num;
 
   void CheckRoundtrip() {
+    std::vector<uint8_t> valid_bits(BitUtil::RoundUpNumBytes(num_values_) + 1, 255);
     DictEncoder<Type> encoder(descr_.get(), &pool_);
 
     ASSERT_NO_THROW(encoder.Put(draws_, num_values_));
     dict_buffer_ = AllocateBuffer(default_allocator(), encoder.dict_encoded_size());
-
     encoder.WriteDict(dict_buffer_->mutable_data());
-
     std::shared_ptr<Buffer> indices = encoder.FlushValues();
 
+    DictEncoder<Type> spaced_encoder(descr_.get(), &pool_);
+    // PutSpaced should lead to the same results
+    ASSERT_NO_THROW(spaced_encoder.PutSpaced(draws_, num_values_, valid_bits.data(), 0));
+    std::shared_ptr<Buffer> indices_from_spaced = spaced_encoder.FlushValues();
+    ASSERT_TRUE(indices_from_spaced->Equals(*indices));
+
     PlainDecoder<Type> dict_decoder(descr_.get());
     dict_decoder.SetData(
         encoder.num_entries(), dict_buffer_->data(), dict_buffer_->size());
@@ -273,7 +278,6 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
 
     // Also test spaced decoding
     decoder.SetData(num_values_, indices->data(), indices->size());
-    std::vector<uint8_t> valid_bits(BitUtil::RoundUpNumBytes(num_values_) + 1, 255);
     values_decoded =
         decoder.DecodeSpaced(decode_buf_, num_values_, 0, valid_bits.data(), 0);
     ASSERT_EQ(num_values_, values_decoded);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8f0a51d4/src/parquet/util/bit-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h
index 4d85864..ca094bc 100644
--- a/src/parquet/util/bit-util.h
+++ b/src/parquet/util/bit-util.h
@@ -29,14 +29,12 @@
 #include <cstdint>
 
 #include "parquet/util/compiler-util.h"
-#include "parquet/util/cpu-info.h"
-#include "parquet/util/sse-util.h"
 
 namespace parquet {
 
 #define INIT_BITSET(valid_bits_vector, valid_bits_index) \
-  int byte_offset = valid_bits_index / 8; \
-  int bit_offset = valid_bits_index % 8; \
+  int byte_offset = valid_bits_index / 8;                \
+  int bit_offset = valid_bits_index % 8;                 \
   uint8_t bitset = valid_bits_vector[byte_offset];
 
 #define READ_NEXT_BITSET(valid_bits_vector)  \
@@ -111,12 +109,12 @@ class BitUtil {
   /// Returns 'value' rounded up to the nearest multiple of 'factor' when factor is
   /// a power of two
   static inline int RoundUpToPowerOf2(int value, int factor) {
-    DCHECK((factor > 0) && ((factor & (factor - 1)) == 0));
+    // DCHECK((factor > 0) && ((factor & (factor - 1)) == 0));
     return (value + (factor - 1)) & ~(factor - 1);
   }
 
   static inline int RoundDownToPowerOf2(int value, int factor) {
-    DCHECK((factor > 0) && ((factor & (factor - 1)) == 0));
+    // DCHECK((factor > 0) && ((factor & (factor - 1)) == 0));
     return value & ~(factor - 1);
   }
 
@@ -182,7 +180,7 @@ class BitUtil {
   /// TODO: this could be faster if we use __builtin_clz.  Fix this if this ever shows up
   /// in a hot path.
   static inline int Log2(uint64_t x) {
-    DCHECK_GT(x, 0);
+    // DCHECK_GT(x, 0);
     if (x == 1) return 0;
     // Compute result = ceil(log2(x))
     //                = floor(log2(x - 1)) + 1, for x > 1


Mime
View raw message