Repository: parquet-cpp
Updated Branches:
refs/heads/master 491182c22 -> 84db929ec
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
index 58f23df..203a312 100644
--- a/src/parquet/schema-test.cc
+++ b/src/parquet/schema-test.cc
@@ -337,13 +337,13 @@ TEST_F(TestGroupNode, FieldIndex) {
NodeVector fields = Fields1();
GroupNode group("group", Repetition::REQUIRED, fields);
for (size_t i = 0; i < fields.size(); i++) {
- auto field = group.field(static_cast<int>(i));
- ASSERT_EQ(i, group.FieldIndex(*field.get()));
+ auto field = group.field(static_cast<int>(i));
+ ASSERT_EQ(i, group.FieldIndex(*field.get()));
}
// Test a non field node
- auto non_field_alien = Int32("alien", Repetition::REQUIRED); // other name
- auto non_field_familiar = Int32("one", Repetition::REPEATED); // other node
+ auto non_field_alien = Int32("alien", Repetition::REQUIRED); // other name
+ auto non_field_familiar = Int32("one", Repetition::REPEATED); // other node
ASSERT_TRUE(group.FieldIndex(*non_field_alien.get()) < 0);
ASSERT_TRUE(group.FieldIndex(*non_field_familiar.get()) < 0);
}
@@ -664,13 +664,13 @@ TEST_F(TestSchemaDescriptor, BuildTree) {
ASSERT_EQ(descr_.Column(5)->path()->ToDotString(), "bag.records.item3");
for (int i = 0; i < nleaves; ++i) {
- auto col = descr_.Column(i);
- ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node().get()));
+ auto col = descr_.Column(i);
+ ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node().get()));
}
// Test non-column nodes find
- NodePtr non_column_alien = Int32("alien", Repetition::REQUIRED); // other path
- NodePtr non_column_familiar = Int32("a", Repetition::REPEATED); // other node
+ NodePtr non_column_alien = Int32("alien", Repetition::REQUIRED); // other path
+ NodePtr non_column_familiar = Int32("a", Repetition::REPEATED); // other node
ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0);
ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
new file mode 100644
index 0000000..cbc761d
--- /dev/null
+++ b/src/parquet/statistics-test.cc
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <array>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/file/reader.h"
+#include "parquet/file/writer.h"
+#include "parquet/schema.h"
+#include "parquet/statistics.h"
+#include "parquet/test-specialization.h"
+#include "parquet/test-util.h"
+#include "parquet/thrift.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+
+namespace parquet {
+
+using schema::NodePtr;
+using schema::PrimitiveNode;
+using schema::GroupNode;
+
+namespace test {
+
+template <typename TestType>
+class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> {
+ public:
+ using T = typename TestType::c_type;
+ using TypedStats = TypedRowGroupStatistics<TestType>;
+
+ std::vector<T> GetDeepCopy(
+ const std::vector<T>&); // allocates new memory for FLBA/ByteArray
+
+ T* GetValuesPointer(std::vector<T>&);
+ void DeepFree(std::vector<T>&);
+
+ void TestMinMaxEncode() {
+ this->GenerateData(1000);
+
+ TypedStats statistics1(this->schema_.Column(0));
+ statistics1.Update(this->values_ptr_, this->values_.size(), 0);
+ std::string encoded_min = statistics1.EncodeMin();
+ std::string encoded_max = statistics1.EncodeMax();
+
+ TypedStats statistics2(this->schema_.Column(0), encoded_min, encoded_max,
+ this->values_.size(), 0, 0, true);
+
+ TypedStats statistics3(this->schema_.Column(0));
+ std::vector<uint8_t> valid_bits(
+ BitUtil::RoundUpNumBytes(static_cast<uint32_t>(this->values_.size())) + 1, 255);
+ statistics3.UpdateSpaced(
+ this->values_ptr_, valid_bits.data(), 0, this->values_.size(), 0);
+ std::string encoded_min_spaced = statistics3.EncodeMin();
+ std::string encoded_max_spaced = statistics3.EncodeMax();
+
+ ASSERT_EQ(encoded_min, statistics2.EncodeMin());
+ ASSERT_EQ(encoded_max, statistics2.EncodeMax());
+ ASSERT_EQ(statistics1.min(), statistics2.min());
+ ASSERT_EQ(statistics1.max(), statistics2.max());
+ ASSERT_EQ(encoded_min_spaced, statistics2.EncodeMin());
+ ASSERT_EQ(encoded_max_spaced, statistics2.EncodeMax());
+ ASSERT_EQ(statistics3.min(), statistics2.min());
+ ASSERT_EQ(statistics3.max(), statistics2.max());
+ }
+
+ void TestReset() {
+ this->GenerateData(1000);
+
+ TypedStats statistics(this->schema_.Column(0));
+ statistics.Update(this->values_ptr_, this->values_.size(), 0);
+ ASSERT_EQ(this->values_.size(), statistics.num_values());
+
+ statistics.Reset();
+ ASSERT_EQ(0, statistics.null_count());
+ ASSERT_EQ(0, statistics.num_values());
+ ASSERT_EQ("", statistics.EncodeMin());
+ ASSERT_EQ("", statistics.EncodeMax());
+ }
+
+ void TestMerge() {
+ int num_null[2];
+ random_numbers(2, 42, 0, 100, num_null);
+
+ TypedStats statistics1(this->schema_.Column(0));
+ this->GenerateData(1000);
+ statistics1.Update(
+ this->values_ptr_, this->values_.size() - num_null[0], num_null[0]);
+
+ TypedStats statistics2(this->schema_.Column(0));
+ this->GenerateData(1000);
+ statistics2.Update(
+ this->values_ptr_, this->values_.size() - num_null[1], num_null[1]);
+
+ TypedStats total(this->schema_.Column(0));
+ total.Merge(statistics1);
+ total.Merge(statistics2);
+
+ ASSERT_EQ(num_null[0] + num_null[1], total.null_count());
+ ASSERT_EQ(this->values_.size() * 2 - num_null[0] - num_null[1], total.num_values());
+ ASSERT_EQ(total.min(), std::min(statistics1.min(), statistics2.min()));
+ ASSERT_EQ(total.max(), std::max(statistics1.max(), statistics2.max()));
+ }
+
+ void TestFullRoundtrip(int64_t num_values, int64_t null_count) {
+ this->GenerateData(num_values);
+
+ // compute statistics for the whole batch
+ TypedStats expected_stats(this->schema_.Column(0));
+ expected_stats.Update(this->values_ptr_, num_values - null_count, null_count);
+
+ auto sink = std::make_shared<InMemoryOutputStream>();
+ auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+ std::shared_ptr<WriterProperties> writer_properties =
+ WriterProperties::Builder().enable_statistics("column")->build();
+ auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
+ auto row_group_writer = file_writer->AppendRowGroup(num_values);
+ auto column_writer =
+ static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+
+ // simulate the case when data comes from multiple buffers,
+ // in which case special care is necessary for FLBA/ByteArray types
+ for (int i = 0; i < 2; i++) {
+ int64_t batch_num_values = i ? num_values - num_values / 2 : num_values / 2;
+ int64_t batch_null_count = i ? null_count : 0;
+ DCHECK(null_count <= num_values); // avoid too much headache
+ std::vector<int16_t> definition_levels(batch_null_count, 0);
+ definition_levels.insert(
+ definition_levels.end(), batch_num_values - batch_null_count, 1);
+ auto beg = this->values_.begin() + i * num_values / 2;
+ auto end = beg + batch_num_values;
+ std::vector<T> batch = GetDeepCopy(std::vector<T>(beg, end));
+ T* batch_values_ptr = GetValuesPointer(batch);
+ column_writer->WriteBatch(
+ batch_num_values, definition_levels.data(), nullptr, batch_values_ptr);
+ DeepFree(batch);
+ }
+ column_writer->Close();
+ row_group_writer->Close();
+ file_writer->Close();
+
+ auto buffer = sink->GetBuffer();
+ auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+ auto file_reader = ParquetFileReader::Open(source);
+ auto rg_reader = file_reader->RowGroup(0);
+ auto column_chunk = rg_reader->metadata()->ColumnChunk(0);
+ if (!column_chunk->is_stats_set()) return;
+ std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics();
+ // check values after serialization + deserialization
+ ASSERT_EQ(null_count, stats->null_count());
+ ASSERT_EQ(num_values - null_count, stats->num_values());
+ ASSERT_EQ(expected_stats.EncodeMin(), stats->EncodeMin());
+ ASSERT_EQ(expected_stats.EncodeMax(), stats->EncodeMax());
+ }
+};
+
+template <typename TestType>
+typename TestType::c_type* TestRowGroupStatistics<TestType>::GetValuesPointer(
+ std::vector<typename TestType::c_type>& values) {
+ return values.data();
+}
+
+template <>
+bool* TestRowGroupStatistics<BooleanType>::GetValuesPointer(std::vector<bool>& values) {
+ static std::vector<uint8_t> bool_buffer;
+ bool_buffer.clear();
+ bool_buffer.resize(values.size());
+ std::copy(values.begin(), values.end(), bool_buffer.begin());
+ return reinterpret_cast<bool*>(bool_buffer.data());
+}
+
+template <typename TestType>
+typename std::vector<typename TestType::c_type>
+TestRowGroupStatistics<TestType>::GetDeepCopy(
+ const std::vector<typename TestType::c_type>& values) {
+ return values;
+}
+
+template <>
+std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy(
+ const std::vector<FLBA>& values) {
+ std::vector<FLBA> copy;
+ MemoryPool* pool = ::arrow::default_memory_pool();
+ for (const FLBA& flba : values) {
+ uint8_t* ptr;
+ PARQUET_THROW_NOT_OK(pool->Allocate(FLBA_LENGTH, &ptr));
+ memcpy(ptr, flba.ptr, FLBA_LENGTH);
+ copy.emplace_back(ptr);
+ }
+ return copy;
+}
+
+template <>
+std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy(
+ const std::vector<ByteArray>& values) {
+ std::vector<ByteArray> copy;
+ MemoryPool* pool = default_memory_pool();
+ for (const ByteArray& ba : values) {
+ uint8_t* ptr;
+ PARQUET_THROW_NOT_OK(pool->Allocate(ba.len, &ptr));
+ memcpy(ptr, ba.ptr, ba.len);
+ copy.emplace_back(ba.len, ptr);
+ }
+ return copy;
+}
+
+template <typename TestType>
+void TestRowGroupStatistics<TestType>::DeepFree(
+ std::vector<typename TestType::c_type>& values) {}
+
+template <>
+void TestRowGroupStatistics<FLBAType>::DeepFree(std::vector<FLBA>& values) {
+ MemoryPool* pool = default_memory_pool();
+ for (FLBA& flba : values) {
+ auto ptr = const_cast<uint8_t*>(flba.ptr);
+ memset(ptr, 0, FLBA_LENGTH);
+ pool->Free(ptr, FLBA_LENGTH);
+ }
+}
+
+template <>
+void TestRowGroupStatistics<ByteArrayType>::DeepFree(std::vector<ByteArray>& values) {
+ MemoryPool* pool = default_memory_pool();
+ for (ByteArray& ba : values) {
+ auto ptr = const_cast<uint8_t*>(ba.ptr);
+ memset(ptr, 0, ba.len);
+ pool->Free(ptr, ba.len);
+ }
+}
+
+template <>
+void TestRowGroupStatistics<ByteArrayType>::TestMinMaxEncode() {
+ this->GenerateData(1000);
+ // Test that we encode min max strings correctly
+ TypedRowGroupStatistics<ByteArrayType> statistics1(this->schema_.Column(0));
+ statistics1.Update(this->values_ptr_, this->values_.size(), 0);
+ std::string encoded_min = statistics1.EncodeMin();
+ std::string encoded_max = statistics1.EncodeMax();
+
+ // encoded is same as unencoded
+ ASSERT_EQ(encoded_min,
+ std::string((const char*)statistics1.min().ptr, statistics1.min().len));
+ ASSERT_EQ(encoded_max,
+ std::string((const char*)statistics1.max().ptr, statistics1.max().len));
+
+ TypedRowGroupStatistics<ByteArrayType> statistics2(this->schema_.Column(0), encoded_min,
+ encoded_max, this->values_.size(), 0, 0, true);
+
+ ASSERT_EQ(encoded_min, statistics2.EncodeMin());
+ ASSERT_EQ(encoded_max, statistics2.EncodeMax());
+ ASSERT_EQ(statistics1.min(), statistics2.min());
+ ASSERT_EQ(statistics1.max(), statistics2.max());
+}
+
+using TestTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+ ByteArrayType, FLBAType, BooleanType>;
+
+TYPED_TEST_CASE(TestRowGroupStatistics, TestTypes);
+
+TYPED_TEST(TestRowGroupStatistics, MinMaxEncode) {
+ this->SetUpSchema(Repetition::REQUIRED);
+ this->TestMinMaxEncode();
+}
+
+TYPED_TEST(TestRowGroupStatistics, Reset) {
+ this->SetUpSchema(Repetition::OPTIONAL);
+ this->TestReset();
+}
+
+TYPED_TEST(TestRowGroupStatistics, FullRoundtrip) {
+ this->SetUpSchema(Repetition::OPTIONAL);
+ this->TestFullRoundtrip(100, 31);
+ this->TestFullRoundtrip(1000, 415);
+ this->TestFullRoundtrip(10000, 926);
+}
+
+template <typename TestType>
+class TestNumericRowGroupStatistics : public TestRowGroupStatistics<TestType> {};
+
+using NumericTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType>;
+
+TYPED_TEST_CASE(TestNumericRowGroupStatistics, NumericTypes);
+
+TYPED_TEST(TestNumericRowGroupStatistics, Merge) {
+ this->SetUpSchema(Repetition::OPTIONAL);
+ this->TestMerge();
+}
+
+TEST(CorruptStatistics, Basics) {
+ ApplicationVersion version("parquet-mr version 1.8.0");
+ SchemaDescriptor schema;
+ schema::NodePtr node;
+ std::vector<schema::NodePtr> fields;
+ // Test Physical Types
+ fields.push_back(schema::PrimitiveNode::Make(
+ "col1", Repetition::OPTIONAL, Type::INT32, LogicalType::NONE));
+ fields.push_back(schema::PrimitiveNode::Make(
+ "col2", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::NONE));
+ // Test Logical Types
+ fields.push_back(schema::PrimitiveNode::Make(
+ "col3", Repetition::OPTIONAL, Type::INT32, LogicalType::DATE));
+ fields.push_back(schema::PrimitiveNode::Make(
+ "col4", Repetition::OPTIONAL, Type::INT32, LogicalType::UINT_32));
+ fields.push_back(schema::PrimitiveNode::Make("col5", Repetition::OPTIONAL,
+ Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, 12));
+ fields.push_back(schema::PrimitiveNode::Make(
+ "col6", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::UTF8));
+ node = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
+ schema.Init(node);
+
+ format::ColumnChunk col_chunk;
+ col_chunk.meta_data.__isset.statistics = true;
+ auto column_chunk1 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(0), &version);
+ ASSERT_TRUE(column_chunk1->is_stats_set());
+ auto column_chunk2 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(1), &version);
+ ASSERT_FALSE(column_chunk2->is_stats_set());
+ auto column_chunk3 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(2), &version);
+ ASSERT_TRUE(column_chunk3->is_stats_set());
+ auto column_chunk4 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(3), &version);
+ ASSERT_FALSE(column_chunk4->is_stats_set());
+ auto column_chunk5 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(4), &version);
+ ASSERT_FALSE(column_chunk5->is_stats_set());
+ auto column_chunk6 = ColumnChunkMetaData::Make(
+ reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(5), &version);
+ ASSERT_FALSE(column_chunk6->is_stats_set());
+}
+
+} // namespace test
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/statistics.cc
----------------------------------------------------------------------
diff --git a/src/parquet/statistics.cc b/src/parquet/statistics.cc
new file mode 100644
index 0000000..d99140e
--- /dev/null
+++ b/src/parquet/statistics.cc
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstring>
+
+#include "parquet/encoding-internal.h"
+#include "parquet/exception.h"
+#include "parquet/statistics.h"
+#include "parquet/util/comparison.h"
+#include "parquet/util/memory.h"
+
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+
+namespace parquet {
+
+template <typename DType>
+TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(
+ const ColumnDescriptor* schema, MemoryPool* pool)
+ : pool_(pool),
+ min_buffer_(AllocateBuffer(pool_, 0)),
+ max_buffer_(AllocateBuffer(pool_, 0)) {
+ SetDescr(schema);
+ Reset();
+}
+
+template <typename DType>
+TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_type& min,
+ const typename DType::c_type& max, int64_t num_values, int64_t null_count,
+ int64_t distinct_count)
+ : pool_(default_memory_pool()),
+ min_buffer_(AllocateBuffer(pool_, 0)),
+ max_buffer_(AllocateBuffer(pool_, 0)) {
+ IncrementNumValues(num_values);
+ IncrementNullCount(null_count);
+ IncrementDistinctCount(distinct_count);
+
+ Copy(min, &min_, min_buffer_.get());
+ Copy(max, &max_, max_buffer_.get());
+ has_min_max_ = true;
+}
+
+template <typename DType>
+TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor* schema,
+ const std::string& encoded_min, const std::string& encoded_max, int64_t num_values,
+ int64_t null_count, int64_t distinct_count, bool has_min_max, MemoryPool* pool)
+ : pool_(pool),
+ min_buffer_(AllocateBuffer(pool_, 0)),
+ max_buffer_(AllocateBuffer(pool_, 0)) {
+ IncrementNumValues(num_values);
+ IncrementNullCount(null_count);
+ IncrementDistinctCount(distinct_count);
+
+ SetDescr(schema);
+
+ if (!encoded_min.empty()) { PlainDecode(encoded_min, &min_); }
+ if (!encoded_max.empty()) { PlainDecode(encoded_max, &max_); }
+ has_min_max_ = has_min_max;
+}
+
+template <typename DType>
+bool TypedRowGroupStatistics<DType>::HasMinMax() const {
+ return has_min_max_;
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::Reset() {
+ ResetCounts();
+ has_min_max_ = false;
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::Update(
+ const T* values, int64_t num_not_null, int64_t num_null) {
+ DCHECK(num_not_null >= 0);
+ DCHECK(num_null >= 0);
+
+ IncrementNullCount(num_null);
+ IncrementNumValues(num_not_null);
+ // TODO: support distinct count?
+ if (num_not_null == 0) return;
+
+ Compare<T> compare(descr_);
+ auto batch_minmax = std::minmax_element(values, values + num_not_null, compare);
+ if (!has_min_max_) {
+ has_min_max_ = true;
+ Copy(*batch_minmax.first, &min_, min_buffer_.get());
+ Copy(*batch_minmax.second, &max_, max_buffer_.get());
+ } else {
+ Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get());
+ Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get());
+ }
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
+ const uint8_t* valid_bits, int64_t valid_bits_offset, int64_t num_not_null,
+ int64_t num_null) {
+ DCHECK(num_not_null >= 0);
+ DCHECK(num_null >= 0);
+
+ IncrementNullCount(num_null);
+ IncrementNumValues(num_not_null);
+ // TODO: support distinct count?
+ if (num_not_null == 0) return;
+
+ Compare<T> compare(descr_);
+ INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
+ // Find first valid entry and use that for min/max
+ // As (num_not_null != 0) there must be one
+ int64_t length = num_null + num_not_null;
+ int64_t i = 0;
+ for (; i < length; i++) {
+ if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { break; }
+ READ_NEXT_BITSET(valid_bits);
+ }
+ T min = values[i];
+ T max = values[i];
+ for (; i < length; i++) {
+ if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+ if (compare(values[i], min)) {
+ min = values[i];
+ } else if (compare(max, values[i])) {
+ max = values[i];
+ }
+ }
+ READ_NEXT_BITSET(valid_bits);
+ }
+ if (!has_min_max_) {
+ has_min_max_ = true;
+ Copy(min, &min_, min_buffer_.get());
+ Copy(max, &max_, max_buffer_.get());
+ } else {
+ Copy(std::min(min_, min, compare), &min_, min_buffer_.get());
+ Copy(std::max(max_, max, compare), &max_, max_buffer_.get());
+ }
+}
+
+template <typename DType>
+const typename DType::c_type& TypedRowGroupStatistics<DType>::min() const {
+ return min_;
+}
+
+template <typename DType>
+const typename DType::c_type& TypedRowGroupStatistics<DType>::max() const {
+ return max_;
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>& other) {
+ this->MergeCounts(other);
+
+ if (!other.HasMinMax()) return;
+
+ if (!has_min_max_) {
+ Copy(other.min_, &this->min_, min_buffer_.get());
+ Copy(other.max_, &this->max_, max_buffer_.get());
+ has_min_max_ = true;
+ return;
+ }
+
+ Compare<T> compare(descr_);
+ Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get());
+ Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get());
+}
+
+template <typename DType>
+std::string TypedRowGroupStatistics<DType>::EncodeMin() {
+ std::string s;
+ if (HasMinMax()) this->PlainEncode(min_, &s);
+ return s;
+}
+
+template <typename DType>
+std::string TypedRowGroupStatistics<DType>::EncodeMax() {
+ std::string s;
+ if (HasMinMax()) this->PlainEncode(max_, &s);
+ return s;
+}
+
+template <typename DType>
+EncodedStatistics TypedRowGroupStatistics<DType>::Encode() {
+ EncodedStatistics s;
+ if (HasMinMax()) {
+ s.set_min(this->EncodeMin());
+ s.set_max(this->EncodeMax());
+ }
+ s.set_null_count(this->null_count());
+ return s;
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::PlainEncode(const T& src, std::string* dst) {
+ PlainEncoder<DType> encoder(descr(), pool_);
+ encoder.Put(&src, 1);
+ auto buffer = encoder.FlushValues();
+ auto ptr = reinterpret_cast<const char*>(buffer->data());
+ dst->assign(ptr, buffer->size());
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::PlainDecode(const std::string& src, T* dst) {
+ PlainDecoder<DType> decoder(descr());
+ decoder.SetData(
+ 1, reinterpret_cast<const uint8_t*>(src.c_str()), static_cast<int>(src.size()));
+ decoder.Decode(dst, 1);
+}
+
+template <>
+void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst) {
+ dst->assign(reinterpret_cast<const char*>(src.ptr), src.len);
+}
+
+template <>
+void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst) {
+ dst->len = static_cast<uint32_t>(src.size());
+ dst->ptr = reinterpret_cast<const uint8_t*>(src.c_str());
+}
+
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<BooleanType>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int32Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int64Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int96Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FloatType>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<DoubleType>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<ByteArrayType>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FLBAType>;
+
+} // namespace parquet
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h
new file mode 100644
index 0000000..c6a2487
--- /dev/null
+++ b/src/parquet/statistics.h
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_STATISTICS_H
+#define PARQUET_COLUMN_STATISTICS_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+class PARQUET_EXPORT EncodedStatistics {
+ std::shared_ptr<std::string> max_, min_;
+
+ public:
+ EncodedStatistics()
+ : max_(std::make_shared<std::string>()), min_(std::make_shared<std::string>()) {}
+
+ const std::string& max() const { return *max_; }
+ const std::string& min() const { return *min_; }
+
+ int64_t null_count = 0;
+ int64_t distinct_count = 0;
+
+ bool has_min = false;
+ bool has_max = false;
+ bool has_null_count = false;
+ bool has_distinct_count = false;
+
+ inline bool is_set() const {
+ return has_min || has_max || has_null_count || has_distinct_count;
+ }
+
+ inline EncodedStatistics& set_max(const std::string& value) {
+ *max_ = value;
+ has_max = true;
+ return *this;
+ }
+
+ inline EncodedStatistics& set_min(const std::string& value) {
+ *min_ = value;
+ has_min = true;
+ return *this;
+ }
+
+ inline EncodedStatistics& set_null_count(int64_t value) {
+ null_count = value;
+ has_null_count = true;
+ return *this;
+ }
+
+ inline EncodedStatistics& set_distinct_count(int64_t value) {
+ distinct_count = value;
+ has_distinct_count = true;
+ return *this;
+ }
+};
+
+template <typename DType>
+class PARQUET_EXPORT TypedRowGroupStatistics;
+
+class PARQUET_EXPORT RowGroupStatistics
+ : public std::enable_shared_from_this<RowGroupStatistics> {
+ public:
+ int64_t null_count() const { return statistics_.null_count; }
+ int64_t distinct_count() const { return statistics_.distinct_count; }
+ int64_t num_values() const { return num_values_; }
+
+ virtual bool HasMinMax() const = 0;
+ virtual void Reset() = 0;
+
+ // Plain-encoded minimum value
+ virtual std::string EncodeMin() = 0;
+
+ // Plain-encoded maximum value
+ virtual std::string EncodeMax() = 0;
+
+ virtual EncodedStatistics Encode() = 0;
+
+ virtual ~RowGroupStatistics() {}
+
+ Type::type physical_type() const { return descr_->physical_type(); }
+
+ protected:
+ const ColumnDescriptor* descr() const { return descr_; }
+ void SetDescr(const ColumnDescriptor* schema) { descr_ = schema; }
+
+ void IncrementNullCount(int64_t n) { statistics_.null_count += n; }
+
+ void IncrementNumValues(int64_t n) { num_values_ += n; }
+
+ void IncrementDistinctCount(int64_t n) { statistics_.distinct_count += n; }
+
+ void MergeCounts(const RowGroupStatistics& other) {
+ this->statistics_.null_count += other.statistics_.null_count;
+ this->statistics_.distinct_count += other.statistics_.distinct_count;
+ this->num_values_ += other.num_values_;
+ }
+
+ void ResetCounts() {
+ this->statistics_.null_count = 0;
+ this->statistics_.distinct_count = 0;
+ this->num_values_ = 0;
+ }
+
+ const ColumnDescriptor* descr_ = nullptr;
+ int64_t num_values_ = 0;
+ EncodedStatistics statistics_;
+};
+
+template <typename DType>
+class TypedRowGroupStatistics : public RowGroupStatistics {
+ public:
+ using T = typename DType::c_type;
+
+ TypedRowGroupStatistics(const ColumnDescriptor* schema,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+ TypedRowGroupStatistics(const T& min, const T& max, int64_t num_values,
+ int64_t null_count, int64_t distinct_count);
+
+ TypedRowGroupStatistics(const ColumnDescriptor* schema, const std::string& encoded_min,
+ const std::string& encoded_max, int64_t num_values, int64_t null_count,
+ int64_t distinct_count, bool has_min_max,
+ ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+ bool HasMinMax() const override;
+ void Reset() override;
+ void Merge(const TypedRowGroupStatistics<DType>& other);
+
+ void Update(const T* values, int64_t num_not_null, int64_t num_null);
+ void UpdateSpaced(const T* values, const uint8_t* valid_bits, int64_t valid_bits_spaced,
+ int64_t num_not_null, int64_t num_null);
+
+ const T& min() const;
+ const T& max() const;
+
+ std::string EncodeMin() override;
+ std::string EncodeMax() override;
+ EncodedStatistics Encode() override;
+
+ private:
+ bool has_min_max_ = false;
+ T min_;
+ T max_;
+ ::arrow::MemoryPool* pool_;
+
+ void PlainEncode(const T& src, std::string* dst);
+ void PlainDecode(const std::string& src, T* dst);
+ void Copy(const T& src, T* dst, PoolBuffer* buffer);
+
+ std::shared_ptr<PoolBuffer> min_buffer_, max_buffer_;
+};
+
+template <typename DType>
+inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, PoolBuffer*) {
+ *dst = src;
+}
+
+template <>
+inline void TypedRowGroupStatistics<FLBAType>::Copy(
+ const FLBA& src, FLBA* dst, PoolBuffer* buffer) {
+ if (dst->ptr == src.ptr) return;
+ uint32_t len = descr_->type_length();
+ PARQUET_THROW_NOT_OK(buffer->Resize(len, false));
+ std::memcpy(buffer->mutable_data(), src.ptr, len);
+ *dst = FLBA(buffer->data());
+}
+
+template <>
+inline void TypedRowGroupStatistics<ByteArrayType>::Copy(
+ const ByteArray& src, ByteArray* dst, PoolBuffer* buffer) {
+ if (dst->ptr == src.ptr) return;
+ PARQUET_THROW_NOT_OK(buffer->Resize(src.len, false));
+ std::memcpy(buffer->mutable_data(), src.ptr, src.len);
+ *dst = ByteArray(src.len, buffer->data());
+}
+
+template <>
+void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst);
+
+template <>
+void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst);
+
+typedef TypedRowGroupStatistics<BooleanType> BoolStatistics;
+typedef TypedRowGroupStatistics<Int32Type> Int32Statistics;
+typedef TypedRowGroupStatistics<Int64Type> Int64Statistics;
+typedef TypedRowGroupStatistics<Int96Type> Int96Statistics;
+typedef TypedRowGroupStatistics<FloatType> FloatStatistics;
+typedef TypedRowGroupStatistics<DoubleType> DoubleStatistics;
+typedef TypedRowGroupStatistics<ByteArrayType> ByteArrayStatistics;
+typedef TypedRowGroupStatistics<FLBAType> FLBAStatistics;
+
+#if defined(__GNUC__) && !defined(__clang__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wattributes"
+#endif
+
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<BooleanType>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int32Type>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int64Type>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int96Type>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<FloatType>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<DoubleType>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<ByteArrayType>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<FLBAType>;
+
+#if defined(__GNUC__) && !defined(__clang__)
+#pragma GCC diagnostic pop
+#endif
+
+} // namespace parquet
+
+#endif // PARQUET_COLUMN_STATISTICS_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/test-specialization.h
----------------------------------------------------------------------
diff --git a/src/parquet/test-specialization.h b/src/parquet/test-specialization.h
new file mode 100644
index 0000000..6bd1dee
--- /dev/null
+++ b/src/parquet/test-specialization.h
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_TEST_SPECIALIZATION_H
+#define PARQUET_COLUMN_TEST_SPECIALIZATION_H
+
+#include <algorithm>
+#include <limits>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "parquet/test-util.h"
+
+namespace parquet {
+
+namespace test {
+
+template <>
+void InitValues<bool>(int num_values, vector<bool>& values, vector<uint8_t>& buffer) {
+ values = flip_coins(num_values, 0);
+}
+
+template <>
+void InitValues<ByteArray>(
+ int num_values, vector<ByteArray>& values, vector<uint8_t>& buffer) {
+ int max_byte_array_len = 12;
+ int num_bytes = max_byte_array_len + sizeof(uint32_t);
+ size_t nbytes = num_values * num_bytes;
+ buffer.resize(nbytes);
+ random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len);
+}
+
+template <>
+void InitValues<FLBA>(int num_values, vector<FLBA>& values, vector<uint8_t>& buffer) {
+ size_t nbytes = num_values * FLBA_LENGTH;
+ buffer.resize(nbytes);
+ random_fixed_byte_array(num_values, 0, buffer.data(), FLBA_LENGTH, values.data());
+}
+
+template <>
+void InitValues<Int96>(int num_values, vector<Int96>& values, vector<uint8_t>& buffer) {
+ random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(),
+ std::numeric_limits<int32_t>::max(), values.data());
+}
+
+inline std::string TestColumnName(int i) {
+ std::stringstream col_name;
+ col_name << "column_" << i;
+ return col_name.str();
+}
+
+// This class lives here because of its dependency on the InitValues specializations.
+template <typename TestType>
+class PrimitiveTypedTest : public ::testing::Test {
+ public:
+ typedef typename TestType::c_type T;
+
+ void SetUpSchema(Repetition::type repetition, int num_columns = 1) {
+ std::vector<schema::NodePtr> fields;
+
+ for (int i = 0; i < num_columns; ++i) {
+ std::string name = TestColumnName(i);
+ fields.push_back(schema::PrimitiveNode::Make(
+ name, repetition, TestType::type_num, LogicalType::NONE, FLBA_LENGTH));
+ }
+ node_ = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
+ schema_.Init(node_);
+ }
+
+ void GenerateData(int64_t num_values);
+ void SetupValuesOut(int64_t num_values);
+ void SyncValuesOut();
+
+ protected:
+ schema::NodePtr node_;
+ SchemaDescriptor schema_;
+
+ // Input buffers
+ std::vector<T> values_;
+
+ std::vector<int16_t> def_levels_;
+
+ std::vector<uint8_t> buffer_;
+ // Pointer to the values, needed as we cannot use vector<bool>::data()
+ T* values_ptr_;
+ std::vector<uint8_t> bool_buffer_;
+
+ // Output buffers
+ std::vector<T> values_out_;
+ std::vector<uint8_t> bool_buffer_out_;
+ T* values_out_ptr_;
+};
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::SyncValuesOut() {}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::SyncValuesOut() {
+ std::vector<uint8_t>::const_iterator source_iterator = bool_buffer_out_.begin();
+ std::vector<T>::iterator destination_iterator = values_out_.begin();
+ while (source_iterator != bool_buffer_out_.end()) {
+ *destination_iterator++ = *source_iterator++ != 0;
+ }
+}
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::SetupValuesOut(int64_t num_values) {
+ values_out_.clear();
+ values_out_.resize(num_values);
+ values_out_ptr_ = values_out_.data();
+}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::SetupValuesOut(int64_t num_values) {
+ values_out_.clear();
+ values_out_.resize(num_values);
+
+ bool_buffer_out_.clear();
+ bool_buffer_out_.resize(num_values);
+ // Write once to all values so we can copy it without getting Valgrind errors
+ // about uninitialised values.
+ std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true);
+ values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data());
+}
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::GenerateData(int64_t num_values) {
+ def_levels_.resize(num_values);
+ values_.resize(num_values);
+
+ InitValues<T>(static_cast<int>(num_values), values_, buffer_);
+ values_ptr_ = values_.data();
+
+ std::fill(def_levels_.begin(), def_levels_.end(), 1);
+}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::GenerateData(int64_t num_values) {
+ def_levels_.resize(num_values);
+ values_.resize(num_values);
+
+ InitValues<T>(static_cast<int>(num_values), values_, buffer_);
+ bool_buffer_.resize(num_values);
+ std::copy(values_.begin(), values_.end(), bool_buffer_.begin());
+ values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data());
+
+ std::fill(def_levels_.begin(), def_levels_.end(), 1);
+}
+} // namespace test
+
+} // namespace parquet
+
+#endif // PARQUET_COLUMN_TEST_SPECIALIZATION_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
new file mode 100644
index 0000000..8657a7f
--- /dev/null
+++ b/src/parquet/test-util.h
@@ -0,0 +1,430 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_TEST_UTIL_H
+#define PARQUET_COLUMN_TEST_UTIL_H
+
+#include <algorithm>
+#include <limits>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/encoding-internal.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/test-common.h"
+
+using std::vector;
+using std::shared_ptr;
+
+namespace parquet {
+
+static int FLBA_LENGTH = 12;
+
+bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
+ return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
+}
+
+namespace test {
+
+template <typename T>
+static void InitValues(int num_values, vector<T>& values, vector<uint8_t>& buffer) {
+ random_numbers(num_values, 0, std::numeric_limits<T>::min(),
+ std::numeric_limits<T>::max(), values.data());
+}
+
+template <typename T>
+static void InitDictValues(
+ int num_values, int num_dicts, vector<T>& values, vector<uint8_t>& buffer) {
+ int repeat_factor = num_values / num_dicts;
+ InitValues<T>(num_dicts, values, buffer);
+ // add some repeated values
+ for (int j = 1; j < repeat_factor; ++j) {
+ for (int i = 0; i < num_dicts; ++i) {
+ std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T));
+ }
+ }
+ // computed only dict_per_page * repeat_factor - 1 values < num_values
+ // compute remaining
+ for (int i = num_dicts * repeat_factor; i < num_values; ++i) {
+ std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T));
+ }
+}
+
+class MockPageReader : public PageReader {
+ public:
+ explicit MockPageReader(const vector<shared_ptr<Page>>& pages)
+ : pages_(pages), page_index_(0) {}
+
+ // Implement the PageReader interface
+ virtual shared_ptr<Page> NextPage() {
+ if (page_index_ == static_cast<int>(pages_.size())) {
+ // EOS to consumer
+ return shared_ptr<Page>(nullptr);
+ }
+ return pages_[page_index_++];
+ }
+
+ private:
+ vector<shared_ptr<Page>> pages_;
+ int page_index_;
+};
+
+// TODO(wesm): this is only used for testing for now. Refactor to form part of
+// primary file write path
+template <typename Type>
+class DataPageBuilder {
+ public:
+ typedef typename Type::c_type T;
+
+ // This class writes data and metadata to the passed inputs
+ explicit DataPageBuilder(InMemoryOutputStream* sink)
+ : sink_(sink),
+ num_values_(0),
+ encoding_(Encoding::PLAIN),
+ definition_level_encoding_(Encoding::RLE),
+ repetition_level_encoding_(Encoding::RLE),
+ have_def_levels_(false),
+ have_rep_levels_(false),
+ have_values_(false) {}
+
+ void AppendDefLevels(const vector<int16_t>& levels, int16_t max_level,
+ Encoding::type encoding = Encoding::RLE) {
+ AppendLevels(levels, max_level, encoding);
+
+ num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
+ definition_level_encoding_ = encoding;
+ have_def_levels_ = true;
+ }
+
+ void AppendRepLevels(const vector<int16_t>& levels, int16_t max_level,
+ Encoding::type encoding = Encoding::RLE) {
+ AppendLevels(levels, max_level, encoding);
+
+ num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
+ repetition_level_encoding_ = encoding;
+ have_rep_levels_ = true;
+ }
+
+ void AppendValues(const ColumnDescriptor* d, const vector<T>& values,
+ Encoding::type encoding = Encoding::PLAIN) {
+ PlainEncoder<Type> encoder(d);
+ encoder.Put(&values[0], static_cast<int>(values.size()));
+ std::shared_ptr<Buffer> values_sink = encoder.FlushValues();
+ sink_->Write(values_sink->data(), values_sink->size());
+
+ num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
+ encoding_ = encoding;
+ have_values_ = true;
+ }
+
+ int32_t num_values() const { return num_values_; }
+
+ Encoding::type encoding() const { return encoding_; }
+
+ Encoding::type rep_level_encoding() const { return repetition_level_encoding_; }
+
+ Encoding::type def_level_encoding() const { return definition_level_encoding_; }
+
+ private:
+ InMemoryOutputStream* sink_;
+
+ int32_t num_values_;
+ Encoding::type encoding_;
+ Encoding::type definition_level_encoding_;
+ Encoding::type repetition_level_encoding_;
+
+ bool have_def_levels_;
+ bool have_rep_levels_;
+ bool have_values_;
+
+ // Used internally for both repetition and definition levels
+ void AppendLevels(
+ const vector<int16_t>& levels, int16_t max_level, Encoding::type encoding) {
+ if (encoding != Encoding::RLE) {
+ ParquetException::NYI("only rle encoding currently implemented");
+ }
+
+ // TODO: compute a more precise maximum size for the encoded levels
+ vector<uint8_t> encode_buffer(levels.size() * 2);
+
+ // We encode into separate memory from the output stream because the
+ // RLE-encoded bytes have to be preceded in the stream by their absolute
+ // size.
+ LevelEncoder encoder;
+ encoder.Init(encoding, max_level, static_cast<int>(levels.size()),
+ encode_buffer.data(), static_cast<int>(encode_buffer.size()));
+
+ encoder.Encode(static_cast<int>(levels.size()), levels.data());
+
+ int32_t rle_bytes = encoder.len();
+ sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t));
+ sink_->Write(encode_buffer.data(), rle_bytes);
+ }
+};
+
+template <>
+void DataPageBuilder<BooleanType>::AppendValues(
+ const ColumnDescriptor* d, const vector<bool>& values, Encoding::type encoding) {
+ if (encoding != Encoding::PLAIN) {
+ ParquetException::NYI("only plain encoding currently implemented");
+ }
+ PlainEncoder<BooleanType> encoder(d);
+ encoder.Put(values, static_cast<int>(values.size()));
+ std::shared_ptr<Buffer> buffer = encoder.FlushValues();
+ sink_->Write(buffer->data(), buffer->size());
+
+ num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
+ encoding_ = encoding;
+ have_values_ = true;
+}
+
+template <typename Type>
+static shared_ptr<DataPage> MakeDataPage(const ColumnDescriptor* d,
+ const vector<typename Type::c_type>& values, int num_vals, Encoding::type encoding,
+ const uint8_t* indices, int indices_size, const vector<int16_t>& def_levels,
+ int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level) {
+ int num_values = 0;
+
+ InMemoryOutputStream page_stream;
+ test::DataPageBuilder<Type> page_builder(&page_stream);
+
+ if (!rep_levels.empty()) { page_builder.AppendRepLevels(rep_levels, max_rep_level); }
+ if (!def_levels.empty()) { page_builder.AppendDefLevels(def_levels, max_def_level); }
+
+ if (encoding == Encoding::PLAIN) {
+ page_builder.AppendValues(d, values, encoding);
+ num_values = page_builder.num_values();
+ } else { // DICTIONARY PAGES
+ page_stream.Write(indices, indices_size);
+ num_values = std::max(page_builder.num_values(), num_vals);
+ }
+
+ auto buffer = page_stream.GetBuffer();
+
+ return std::make_shared<DataPage>(buffer, num_values, encoding,
+ page_builder.def_level_encoding(), page_builder.rep_level_encoding());
+}
+
+template <typename TYPE>
+class DictionaryPageBuilder {
+ public:
+ typedef typename TYPE::c_type TC;
+ static constexpr int TN = TYPE::type_num;
+
+ // This class writes data and metadata to the passed inputs
+ explicit DictionaryPageBuilder(const ColumnDescriptor* d)
+ : num_dict_values_(0), have_values_(false) {
+ encoder_.reset(new DictEncoder<TYPE>(d, &pool_));
+ }
+
+ ~DictionaryPageBuilder() { pool_.FreeAll(); }
+
+ shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
+ int num_values = static_cast<int>(values.size());
+ // Dictionary encoding
+ encoder_->Put(values.data(), num_values);
+ num_dict_values_ = encoder_->num_entries();
+ have_values_ = true;
+ return encoder_->FlushValues();
+ }
+
+ shared_ptr<Buffer> WriteDict() {
+ std::shared_ptr<PoolBuffer> dict_buffer =
+ AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size());
+ encoder_->WriteDict(dict_buffer->mutable_data());
+ return dict_buffer;
+ }
+
+ int32_t num_values() const { return num_dict_values_; }
+
+ private:
+ ChunkedAllocator pool_;
+ shared_ptr<DictEncoder<TYPE>> encoder_;
+ int32_t num_dict_values_;
+ bool have_values_;
+};
+
+template <>
+DictionaryPageBuilder<BooleanType>::DictionaryPageBuilder(const ColumnDescriptor* d) {
+ ParquetException::NYI("only plain encoding currently implemented for boolean");
+}
+
+template <>
+shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::WriteDict() {
+ ParquetException::NYI("only plain encoding currently implemented for boolean");
+ return nullptr;
+}
+
+template <>
+shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::AppendValues(
+ const vector<TC>& values) {
+ ParquetException::NYI("only plain encoding currently implemented for boolean");
+ return nullptr;
+}
+
+template <typename Type>
+static shared_ptr<DictionaryPage> MakeDictPage(const ColumnDescriptor* d,
+ const vector<typename Type::c_type>& values, const vector<int>& values_per_page,
+ Encoding::type encoding, vector<shared_ptr<Buffer>>& rle_indices) {
+ InMemoryOutputStream page_stream;
+ test::DictionaryPageBuilder<Type> page_builder(d);
+ int num_pages = static_cast<int>(values_per_page.size());
+ int value_start = 0;
+
+ for (int i = 0; i < num_pages; i++) {
+ rle_indices.push_back(page_builder.AppendValues(
+ slice(values, value_start, value_start + values_per_page[i])));
+ value_start += values_per_page[i];
+ }
+
+ auto buffer = page_builder.WriteDict();
+
+ return std::make_shared<DictionaryPage>(
+ buffer, page_builder.num_values(), Encoding::PLAIN);
+}
+
+// Given def/rep levels and values create multiple dict pages
+template <typename Type>
+static void PaginateDict(const ColumnDescriptor* d,
+ const vector<typename Type::c_type>& values, const vector<int16_t>& def_levels,
+ int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level,
+ int num_levels_per_page, const vector<int>& values_per_page,
+ vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::RLE_DICTIONARY) {
+ int num_pages = static_cast<int>(values_per_page.size());
+ vector<shared_ptr<Buffer>> rle_indices;
+ shared_ptr<DictionaryPage> dict_page =
+ MakeDictPage<Type>(d, values, values_per_page, encoding, rle_indices);
+ pages.push_back(dict_page);
+ int def_level_start = 0;
+ int def_level_end = 0;
+ int rep_level_start = 0;
+ int rep_level_end = 0;
+ for (int i = 0; i < num_pages; i++) {
+ if (max_def_level > 0) {
+ def_level_start = i * num_levels_per_page;
+ def_level_end = (i + 1) * num_levels_per_page;
+ }
+ if (max_rep_level > 0) {
+ rep_level_start = i * num_levels_per_page;
+ rep_level_end = (i + 1) * num_levels_per_page;
+ }
+ shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(d, {}, values_per_page[i],
+ encoding, rle_indices[i]->data(), static_cast<int>(rle_indices[i]->size()),
+ slice(def_levels, def_level_start, def_level_end), max_def_level,
+ slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
+ pages.push_back(data_page);
+ }
+}
+
+// Given def/rep levels and values create multiple plain pages
+template <typename Type>
+static void PaginatePlain(const ColumnDescriptor* d,
+ const vector<typename Type::c_type>& values, const vector<int16_t>& def_levels,
+ int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level,
+ int num_levels_per_page, const vector<int>& values_per_page,
+ vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::PLAIN) {
+ int num_pages = static_cast<int>(values_per_page.size());
+ int def_level_start = 0;
+ int def_level_end = 0;
+ int rep_level_start = 0;
+ int rep_level_end = 0;
+ int value_start = 0;
+ for (int i = 0; i < num_pages; i++) {
+ if (max_def_level > 0) {
+ def_level_start = i * num_levels_per_page;
+ def_level_end = (i + 1) * num_levels_per_page;
+ }
+ if (max_rep_level > 0) {
+ rep_level_start = i * num_levels_per_page;
+ rep_level_end = (i + 1) * num_levels_per_page;
+ }
+ shared_ptr<DataPage> page = MakeDataPage<Type>(d,
+ slice(values, value_start, value_start + values_per_page[i]), values_per_page[i],
+ encoding, NULL, 0, slice(def_levels, def_level_start, def_level_end),
+ max_def_level, slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
+ pages.push_back(page);
+ value_start += values_per_page[i];
+ }
+}
+
+// Generates pages from randomly generated data
+template <typename Type>
+static int MakePages(const ColumnDescriptor* d, int num_pages, int levels_per_page,
+ vector<int16_t>& def_levels, vector<int16_t>& rep_levels,
+ vector<typename Type::c_type>& values, vector<uint8_t>& buffer,
+ vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::PLAIN) {
+ int num_levels = levels_per_page * num_pages;
+ int num_values = 0;
+ uint32_t seed = 0;
+ int16_t zero = 0;
+ int16_t max_def_level = d->max_definition_level();
+ int16_t max_rep_level = d->max_repetition_level();
+ vector<int> values_per_page(num_pages, levels_per_page);
+ // Create definition levels
+ if (max_def_level > 0) {
+ def_levels.resize(num_levels);
+ random_numbers(num_levels, seed, zero, max_def_level, def_levels.data());
+ for (int p = 0; p < num_pages; p++) {
+ int num_values_per_page = 0;
+ for (int i = 0; i < levels_per_page; i++) {
+ if (def_levels[i + p * levels_per_page] == max_def_level) {
+ num_values_per_page++;
+ num_values++;
+ }
+ }
+ values_per_page[p] = num_values_per_page;
+ }
+ } else {
+ num_values = num_levels;
+ }
+ // Create repitition levels
+ if (max_rep_level > 0) {
+ rep_levels.resize(num_levels);
+ random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data());
+ }
+ // Create values
+ values.resize(num_values);
+ if (encoding == Encoding::PLAIN) {
+ InitValues<typename Type::c_type>(num_values, values, buffer);
+ PaginatePlain<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
+ levels_per_page, values_per_page, pages);
+ } else if (encoding == Encoding::RLE_DICTIONARY ||
+ encoding == Encoding::PLAIN_DICTIONARY) {
+ // Calls InitValues and repeats the data
+ InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer);
+ PaginateDict<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
+ levels_per_page, values_per_page, pages);
+ }
+
+ return num_values;
+}
+
+} // namespace test
+
+} // namespace parquet
+
+#endif // PARQUET_COLUMN_TEST_UTIL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/util/memory.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 4f780c4..2e7eb0f 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -46,20 +46,16 @@ static inline std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::typ
case Compression::UNCOMPRESSED:
break;
case Compression::SNAPPY:
- PARQUET_THROW_NOT_OK(
- ::arrow::Codec::Create(::arrow::Compression::SNAPPY, &result));
+ PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::SNAPPY, &result));
break;
case Compression::GZIP:
- PARQUET_THROW_NOT_OK(
- ::arrow::Codec::Create(::arrow::Compression::GZIP, &result));
+ PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::GZIP, &result));
break;
case Compression::LZO:
- PARQUET_THROW_NOT_OK(
- ::arrow::Codec::Create(::arrow::Compression::LZO, &result));
+ PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZO, &result));
break;
case Compression::BROTLI:
- PARQUET_THROW_NOT_OK(
- ::arrow::Codec::Create(::arrow::Compression::BROTLI, &result));
+ PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::BROTLI, &result));
break;
default:
break;
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/util/schema-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h
index 618d21e..e199c21 100644
--- a/src/parquet/util/schema-util.h
+++ b/src/parquet/util/schema-util.h
@@ -19,8 +19,8 @@
#define PARQUET_SCHEMA_UTIL_H
#include <string>
-#include <vector>
#include <unordered_set>
+#include <vector>
#include "parquet/exception.h"
#include "parquet/schema.h"
@@ -43,8 +43,7 @@ inline bool str_endswith_tuple(const std::string& str) {
// If the name is array or ends in _tuple, this should be a list of struct
// even for single child elements.
inline bool HasStructListName(const GroupNode& node) {
- return (node.name() == "array" ||
- str_endswith_tuple(node.name()));
+ return (node.name() == "array" || str_endswith_tuple(node.name()));
}
// TODO(itaiin): This aux. function is to be deleted once repeated structs are supported
@@ -53,8 +52,8 @@ inline bool IsSimpleStruct(const NodePtr& node) {
if (node->is_repeated()) return false;
if (node->logical_type() == LogicalType::LIST) return false;
// Special case mentioned in the format spec:
- // If the name is array or ends in _tuple, this should be a list of struct
- // even for single child elements.
+ // If the name is array or ends in _tuple, this should be a list of struct
+ // even for single child elements.
auto group = static_cast<const GroupNode*>(node.get());
if (group->field_count() == 1 && HasStructListName(*group)) return false;
@@ -71,9 +70,7 @@ inline bool ColumnIndicesToFieldIndices(const SchemaDescriptor& descr,
for (auto& column_idx : column_indices) {
auto field_node = descr.GetColumnRoot(column_idx);
auto field_idx = group->FieldIndex(field_node->name());
- if (field_idx < 0) {
- return false;
- }
+ if (field_idx < 0) { return false; }
auto insertion = already_added.insert(field_idx);
if (insertion.second) { out->push_back(field_idx); }
}
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/util/visibility.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/visibility.h b/src/parquet/util/visibility.h
index 1601335..984fac2 100644
--- a/src/parquet/util/visibility.h
+++ b/src/parquet/util/visibility.h
@@ -21,7 +21,8 @@
#if defined(_WIN32) || defined(__CYGWIN__)
#ifdef _MSC_VER
#pragma warning(push)
-// Disable warning for STL types usage in DLL interface https://web.archive.org/web/20130317015847/http://connect.microsoft.com/VisualStudio/feedback/details/696593/vc-10-vs-2010-basic-string-exports
+// Disable warning for STL types usage in DLL interface
+// https://web.archive.org/web/20130317015847/http://connect.microsoft.com/VisualStudio/feedback/details/696593/vc-10-vs-2010-basic-string-exports
#pragma warning(disable : 4275 4251)
// Disable diamond inheritance warnings
#pragma warning(disable : 4250)
|