parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [1/6] parquet-cpp git commit: PARQUET-858: Flatten column directory, minor code consolidation
Date Mon, 26 Jun 2017 07:05:24 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 491182c22 -> 84db929ec


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
index 58f23df..203a312 100644
--- a/src/parquet/schema-test.cc
+++ b/src/parquet/schema-test.cc
@@ -337,13 +337,13 @@ TEST_F(TestGroupNode, FieldIndex) {
   NodeVector fields = Fields1();
   GroupNode group("group", Repetition::REQUIRED, fields);
   for (size_t i = 0; i < fields.size(); i++) {
-      auto field = group.field(static_cast<int>(i));
-      ASSERT_EQ(i, group.FieldIndex(*field.get()));
+    auto field = group.field(static_cast<int>(i));
+    ASSERT_EQ(i, group.FieldIndex(*field.get()));
   }
 
   // Test a non field node
-  auto non_field_alien = Int32("alien", Repetition::REQUIRED); // other name
-  auto non_field_familiar = Int32("one", Repetition::REPEATED); // other node
+  auto non_field_alien = Int32("alien", Repetition::REQUIRED);   // other name
+  auto non_field_familiar = Int32("one", Repetition::REPEATED);  // other node
   ASSERT_TRUE(group.FieldIndex(*non_field_alien.get()) < 0);
   ASSERT_TRUE(group.FieldIndex(*non_field_familiar.get()) < 0);
 }
@@ -664,13 +664,13 @@ TEST_F(TestSchemaDescriptor, BuildTree) {
   ASSERT_EQ(descr_.Column(5)->path()->ToDotString(), "bag.records.item3");
 
   for (int i = 0; i < nleaves; ++i) {
-      auto col = descr_.Column(i);
-      ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node().get()));
+    auto col = descr_.Column(i);
+    ASSERT_EQ(i, descr_.ColumnIndex(*col->schema_node().get()));
   }
 
   // Test non-column nodes find
-  NodePtr non_column_alien = Int32("alien", Repetition::REQUIRED); // other path
-  NodePtr non_column_familiar = Int32("a", Repetition::REPEATED); // other node
+  NodePtr non_column_alien = Int32("alien", Repetition::REQUIRED);  // other path
+  NodePtr non_column_familiar = Int32("a", Repetition::REPEATED);   // other node
   ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0);
   ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
new file mode 100644
index 0000000..cbc761d
--- /dev/null
+++ b/src/parquet/statistics-test.cc
@@ -0,0 +1,358 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <array>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/file/reader.h"
+#include "parquet/file/writer.h"
+#include "parquet/schema.h"
+#include "parquet/statistics.h"
+#include "parquet/test-specialization.h"
+#include "parquet/test-util.h"
+#include "parquet/thrift.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+
+namespace parquet {
+
+using schema::NodePtr;
+using schema::PrimitiveNode;
+using schema::GroupNode;
+
+namespace test {
+
+template <typename TestType>
+class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> {
+ public:
+  using T = typename TestType::c_type;
+  using TypedStats = TypedRowGroupStatistics<TestType>;
+
+  std::vector<T> GetDeepCopy(
+      const std::vector<T>&);  // allocates new memory for FLBA/ByteArray
+
+  T* GetValuesPointer(std::vector<T>&);
+  void DeepFree(std::vector<T>&);
+
+  void TestMinMaxEncode() {
+    this->GenerateData(1000);
+
+    TypedStats statistics1(this->schema_.Column(0));
+    statistics1.Update(this->values_ptr_, this->values_.size(), 0);
+    std::string encoded_min = statistics1.EncodeMin();
+    std::string encoded_max = statistics1.EncodeMax();
+
+    TypedStats statistics2(this->schema_.Column(0), encoded_min, encoded_max,
+        this->values_.size(), 0, 0, true);
+
+    TypedStats statistics3(this->schema_.Column(0));
+    std::vector<uint8_t> valid_bits(
+        BitUtil::RoundUpNumBytes(static_cast<uint32_t>(this->values_.size())) + 1, 255);
+    statistics3.UpdateSpaced(
+        this->values_ptr_, valid_bits.data(), 0, this->values_.size(), 0);
+    std::string encoded_min_spaced = statistics3.EncodeMin();
+    std::string encoded_max_spaced = statistics3.EncodeMax();
+
+    ASSERT_EQ(encoded_min, statistics2.EncodeMin());
+    ASSERT_EQ(encoded_max, statistics2.EncodeMax());
+    ASSERT_EQ(statistics1.min(), statistics2.min());
+    ASSERT_EQ(statistics1.max(), statistics2.max());
+    ASSERT_EQ(encoded_min_spaced, statistics2.EncodeMin());
+    ASSERT_EQ(encoded_max_spaced, statistics2.EncodeMax());
+    ASSERT_EQ(statistics3.min(), statistics2.min());
+    ASSERT_EQ(statistics3.max(), statistics2.max());
+  }
+
+  void TestReset() {
+    this->GenerateData(1000);
+
+    TypedStats statistics(this->schema_.Column(0));
+    statistics.Update(this->values_ptr_, this->values_.size(), 0);
+    ASSERT_EQ(this->values_.size(), statistics.num_values());
+
+    statistics.Reset();
+    ASSERT_EQ(0, statistics.null_count());
+    ASSERT_EQ(0, statistics.num_values());
+    ASSERT_EQ("", statistics.EncodeMin());
+    ASSERT_EQ("", statistics.EncodeMax());
+  }
+
+  void TestMerge() {
+    int num_null[2];
+    random_numbers(2, 42, 0, 100, num_null);
+
+    TypedStats statistics1(this->schema_.Column(0));
+    this->GenerateData(1000);
+    statistics1.Update(
+        this->values_ptr_, this->values_.size() - num_null[0], num_null[0]);
+
+    TypedStats statistics2(this->schema_.Column(0));
+    this->GenerateData(1000);
+    statistics2.Update(
+        this->values_ptr_, this->values_.size() - num_null[1], num_null[1]);
+
+    TypedStats total(this->schema_.Column(0));
+    total.Merge(statistics1);
+    total.Merge(statistics2);
+
+    ASSERT_EQ(num_null[0] + num_null[1], total.null_count());
+    ASSERT_EQ(this->values_.size() * 2 - num_null[0] - num_null[1], total.num_values());
+    ASSERT_EQ(total.min(), std::min(statistics1.min(), statistics2.min()));
+    ASSERT_EQ(total.max(), std::max(statistics1.max(), statistics2.max()));
+  }
+
+  void TestFullRoundtrip(int64_t num_values, int64_t null_count) {
+    this->GenerateData(num_values);
+
+    // compute statistics for the whole batch
+    TypedStats expected_stats(this->schema_.Column(0));
+    expected_stats.Update(this->values_ptr_, num_values - null_count, null_count);
+
+    auto sink = std::make_shared<InMemoryOutputStream>();
+    auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
+    std::shared_ptr<WriterProperties> writer_properties =
+        WriterProperties::Builder().enable_statistics("column")->build();
+    auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
+    auto row_group_writer = file_writer->AppendRowGroup(num_values);
+    auto column_writer =
+        static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
+
+    // simulate the case when data comes from multiple buffers,
+    // in which case special care is necessary for FLBA/ByteArray types
+    for (int i = 0; i < 2; i++) {
+      int64_t batch_num_values = i ? num_values - num_values / 2 : num_values / 2;
+      int64_t batch_null_count = i ? null_count : 0;
+      DCHECK(null_count <= num_values);  // avoid too much headache
+      std::vector<int16_t> definition_levels(batch_null_count, 0);
+      definition_levels.insert(
+          definition_levels.end(), batch_num_values - batch_null_count, 1);
+      auto beg = this->values_.begin() + i * num_values / 2;
+      auto end = beg + batch_num_values;
+      std::vector<T> batch = GetDeepCopy(std::vector<T>(beg, end));
+      T* batch_values_ptr = GetValuesPointer(batch);
+      column_writer->WriteBatch(
+          batch_num_values, definition_levels.data(), nullptr, batch_values_ptr);
+      DeepFree(batch);
+    }
+    column_writer->Close();
+    row_group_writer->Close();
+    file_writer->Close();
+
+    auto buffer = sink->GetBuffer();
+    auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+    auto file_reader = ParquetFileReader::Open(source);
+    auto rg_reader = file_reader->RowGroup(0);
+    auto column_chunk = rg_reader->metadata()->ColumnChunk(0);
+    if (!column_chunk->is_stats_set()) return;
+    std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics();
+    // check values after serialization + deserialization
+    ASSERT_EQ(null_count, stats->null_count());
+    ASSERT_EQ(num_values - null_count, stats->num_values());
+    ASSERT_EQ(expected_stats.EncodeMin(), stats->EncodeMin());
+    ASSERT_EQ(expected_stats.EncodeMax(), stats->EncodeMax());
+  }
+};
+
+template <typename TestType>
+typename TestType::c_type* TestRowGroupStatistics<TestType>::GetValuesPointer(
+    std::vector<typename TestType::c_type>& values) {
+  return values.data();
+}
+
+template <>
+bool* TestRowGroupStatistics<BooleanType>::GetValuesPointer(std::vector<bool>& values) {
+  static std::vector<uint8_t> bool_buffer;
+  bool_buffer.clear();
+  bool_buffer.resize(values.size());
+  std::copy(values.begin(), values.end(), bool_buffer.begin());
+  return reinterpret_cast<bool*>(bool_buffer.data());
+}
+
+template <typename TestType>
+typename std::vector<typename TestType::c_type>
+TestRowGroupStatistics<TestType>::GetDeepCopy(
+    const std::vector<typename TestType::c_type>& values) {
+  return values;
+}
+
+template <>
+std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy(
+    const std::vector<FLBA>& values) {
+  std::vector<FLBA> copy;
+  MemoryPool* pool = ::arrow::default_memory_pool();
+  for (const FLBA& flba : values) {
+    uint8_t* ptr;
+    PARQUET_THROW_NOT_OK(pool->Allocate(FLBA_LENGTH, &ptr));
+    memcpy(ptr, flba.ptr, FLBA_LENGTH);
+    copy.emplace_back(ptr);
+  }
+  return copy;
+}
+
+template <>
+std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy(
+    const std::vector<ByteArray>& values) {
+  std::vector<ByteArray> copy;
+  MemoryPool* pool = default_memory_pool();
+  for (const ByteArray& ba : values) {
+    uint8_t* ptr;
+    PARQUET_THROW_NOT_OK(pool->Allocate(ba.len, &ptr));
+    memcpy(ptr, ba.ptr, ba.len);
+    copy.emplace_back(ba.len, ptr);
+  }
+  return copy;
+}
+
+template <typename TestType>
+void TestRowGroupStatistics<TestType>::DeepFree(
+    std::vector<typename TestType::c_type>& values) {}
+
+template <>
+void TestRowGroupStatistics<FLBAType>::DeepFree(std::vector<FLBA>& values) {
+  MemoryPool* pool = default_memory_pool();
+  for (FLBA& flba : values) {
+    auto ptr = const_cast<uint8_t*>(flba.ptr);
+    memset(ptr, 0, FLBA_LENGTH);
+    pool->Free(ptr, FLBA_LENGTH);
+  }
+}
+
+template <>
+void TestRowGroupStatistics<ByteArrayType>::DeepFree(std::vector<ByteArray>& values) {
+  MemoryPool* pool = default_memory_pool();
+  for (ByteArray& ba : values) {
+    auto ptr = const_cast<uint8_t*>(ba.ptr);
+    memset(ptr, 0, ba.len);
+    pool->Free(ptr, ba.len);
+  }
+}
+
+template <>
+void TestRowGroupStatistics<ByteArrayType>::TestMinMaxEncode() {
+  this->GenerateData(1000);
+  // Test that we encode min max strings correctly
+  TypedRowGroupStatistics<ByteArrayType> statistics1(this->schema_.Column(0));
+  statistics1.Update(this->values_ptr_, this->values_.size(), 0);
+  std::string encoded_min = statistics1.EncodeMin();
+  std::string encoded_max = statistics1.EncodeMax();
+
+  // encoded is same as unencoded
+  ASSERT_EQ(encoded_min,
+      std::string((const char*)statistics1.min().ptr, statistics1.min().len));
+  ASSERT_EQ(encoded_max,
+      std::string((const char*)statistics1.max().ptr, statistics1.max().len));
+
+  TypedRowGroupStatistics<ByteArrayType> statistics2(this->schema_.Column(0), encoded_min,
+      encoded_max, this->values_.size(), 0, 0, true);
+
+  ASSERT_EQ(encoded_min, statistics2.EncodeMin());
+  ASSERT_EQ(encoded_max, statistics2.EncodeMax());
+  ASSERT_EQ(statistics1.min(), statistics2.min());
+  ASSERT_EQ(statistics1.max(), statistics2.max());
+}
+
+using TestTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+    ByteArrayType, FLBAType, BooleanType>;
+
+TYPED_TEST_CASE(TestRowGroupStatistics, TestTypes);
+
+TYPED_TEST(TestRowGroupStatistics, MinMaxEncode) {
+  this->SetUpSchema(Repetition::REQUIRED);
+  this->TestMinMaxEncode();
+}
+
+TYPED_TEST(TestRowGroupStatistics, Reset) {
+  this->SetUpSchema(Repetition::OPTIONAL);
+  this->TestReset();
+}
+
+TYPED_TEST(TestRowGroupStatistics, FullRoundtrip) {
+  this->SetUpSchema(Repetition::OPTIONAL);
+  this->TestFullRoundtrip(100, 31);
+  this->TestFullRoundtrip(1000, 415);
+  this->TestFullRoundtrip(10000, 926);
+}
+
+template <typename TestType>
+class TestNumericRowGroupStatistics : public TestRowGroupStatistics<TestType> {};
+
+using NumericTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType>;
+
+TYPED_TEST_CASE(TestNumericRowGroupStatistics, NumericTypes);
+
+TYPED_TEST(TestNumericRowGroupStatistics, Merge) {
+  this->SetUpSchema(Repetition::OPTIONAL);
+  this->TestMerge();
+}
+
+TEST(CorruptStatistics, Basics) {
+  ApplicationVersion version("parquet-mr version 1.8.0");
+  SchemaDescriptor schema;
+  schema::NodePtr node;
+  std::vector<schema::NodePtr> fields;
+  // Test Physical Types
+  fields.push_back(schema::PrimitiveNode::Make(
+      "col1", Repetition::OPTIONAL, Type::INT32, LogicalType::NONE));
+  fields.push_back(schema::PrimitiveNode::Make(
+      "col2", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::NONE));
+  // Test Logical Types
+  fields.push_back(schema::PrimitiveNode::Make(
+      "col3", Repetition::OPTIONAL, Type::INT32, LogicalType::DATE));
+  fields.push_back(schema::PrimitiveNode::Make(
+      "col4", Repetition::OPTIONAL, Type::INT32, LogicalType::UINT_32));
+  fields.push_back(schema::PrimitiveNode::Make("col5", Repetition::OPTIONAL,
+      Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, 12));
+  fields.push_back(schema::PrimitiveNode::Make(
+      "col6", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::UTF8));
+  node = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
+  schema.Init(node);
+
+  format::ColumnChunk col_chunk;
+  col_chunk.meta_data.__isset.statistics = true;
+  auto column_chunk1 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(0), &version);
+  ASSERT_TRUE(column_chunk1->is_stats_set());
+  auto column_chunk2 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(1), &version);
+  ASSERT_FALSE(column_chunk2->is_stats_set());
+  auto column_chunk3 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(2), &version);
+  ASSERT_TRUE(column_chunk3->is_stats_set());
+  auto column_chunk4 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(3), &version);
+  ASSERT_FALSE(column_chunk4->is_stats_set());
+  auto column_chunk5 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(4), &version);
+  ASSERT_FALSE(column_chunk5->is_stats_set());
+  auto column_chunk6 = ColumnChunkMetaData::Make(
+      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(5), &version);
+  ASSERT_FALSE(column_chunk6->is_stats_set());
+}
+
+}  // namespace test
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/statistics.cc
----------------------------------------------------------------------
diff --git a/src/parquet/statistics.cc b/src/parquet/statistics.cc
new file mode 100644
index 0000000..d99140e
--- /dev/null
+++ b/src/parquet/statistics.cc
@@ -0,0 +1,244 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <algorithm>
+#include <cstring>
+
+#include "parquet/encoding-internal.h"
+#include "parquet/exception.h"
+#include "parquet/statistics.h"
+#include "parquet/util/comparison.h"
+#include "parquet/util/memory.h"
+
+using arrow::default_memory_pool;
+using arrow::MemoryPool;
+
+namespace parquet {
+
+template <typename DType>
+TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(
+    const ColumnDescriptor* schema, MemoryPool* pool)
+    : pool_(pool),
+      min_buffer_(AllocateBuffer(pool_, 0)),
+      max_buffer_(AllocateBuffer(pool_, 0)) {
+  SetDescr(schema);
+  Reset();
+}
+
+template <typename DType>
+TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_type& min,
+    const typename DType::c_type& max, int64_t num_values, int64_t null_count,
+    int64_t distinct_count)
+    : pool_(default_memory_pool()),
+      min_buffer_(AllocateBuffer(pool_, 0)),
+      max_buffer_(AllocateBuffer(pool_, 0)) {
+  IncrementNumValues(num_values);
+  IncrementNullCount(null_count);
+  IncrementDistinctCount(distinct_count);
+
+  Copy(min, &min_, min_buffer_.get());
+  Copy(max, &max_, max_buffer_.get());
+  has_min_max_ = true;
+}
+
+template <typename DType>
+TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor* schema,
+    const std::string& encoded_min, const std::string& encoded_max, int64_t num_values,
+    int64_t null_count, int64_t distinct_count, bool has_min_max, MemoryPool* pool)
+    : pool_(pool),
+      min_buffer_(AllocateBuffer(pool_, 0)),
+      max_buffer_(AllocateBuffer(pool_, 0)) {
+  IncrementNumValues(num_values);
+  IncrementNullCount(null_count);
+  IncrementDistinctCount(distinct_count);
+
+  SetDescr(schema);
+
+  if (!encoded_min.empty()) { PlainDecode(encoded_min, &min_); }
+  if (!encoded_max.empty()) { PlainDecode(encoded_max, &max_); }
+  has_min_max_ = has_min_max;
+}
+
+template <typename DType>
+bool TypedRowGroupStatistics<DType>::HasMinMax() const {
+  return has_min_max_;
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::Reset() {
+  ResetCounts();
+  has_min_max_ = false;
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::Update(
+    const T* values, int64_t num_not_null, int64_t num_null) {
+  DCHECK(num_not_null >= 0);
+  DCHECK(num_null >= 0);
+
+  IncrementNullCount(num_null);
+  IncrementNumValues(num_not_null);
+  // TODO: support distinct count?
+  if (num_not_null == 0) return;
+
+  Compare<T> compare(descr_);
+  auto batch_minmax = std::minmax_element(values, values + num_not_null, compare);
+  if (!has_min_max_) {
+    has_min_max_ = true;
+    Copy(*batch_minmax.first, &min_, min_buffer_.get());
+    Copy(*batch_minmax.second, &max_, max_buffer_.get());
+  } else {
+    Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get());
+    Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get());
+  }
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
+    const uint8_t* valid_bits, int64_t valid_bits_offset, int64_t num_not_null,
+    int64_t num_null) {
+  DCHECK(num_not_null >= 0);
+  DCHECK(num_null >= 0);
+
+  IncrementNullCount(num_null);
+  IncrementNumValues(num_not_null);
+  // TODO: support distinct count?
+  if (num_not_null == 0) return;
+
+  Compare<T> compare(descr_);
+  INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
+  // Find first valid entry and use that for min/max
+  // As (num_not_null != 0) there must be one
+  int64_t length = num_null + num_not_null;
+  int64_t i = 0;
+  for (; i < length; i++) {
+    if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { break; }
+    READ_NEXT_BITSET(valid_bits);
+  }
+  T min = values[i];
+  T max = values[i];
+  for (; i < length; i++) {
+    if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+      if (compare(values[i], min)) {
+        min = values[i];
+      } else if (compare(max, values[i])) {
+        max = values[i];
+      }
+    }
+    READ_NEXT_BITSET(valid_bits);
+  }
+  if (!has_min_max_) {
+    has_min_max_ = true;
+    Copy(min, &min_, min_buffer_.get());
+    Copy(max, &max_, max_buffer_.get());
+  } else {
+    Copy(std::min(min_, min, compare), &min_, min_buffer_.get());
+    Copy(std::max(max_, max, compare), &max_, max_buffer_.get());
+  }
+}
+
+template <typename DType>
+const typename DType::c_type& TypedRowGroupStatistics<DType>::min() const {
+  return min_;
+}
+
+template <typename DType>
+const typename DType::c_type& TypedRowGroupStatistics<DType>::max() const {
+  return max_;
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>& other) {
+  this->MergeCounts(other);
+
+  if (!other.HasMinMax()) return;
+
+  if (!has_min_max_) {
+    Copy(other.min_, &this->min_, min_buffer_.get());
+    Copy(other.max_, &this->max_, max_buffer_.get());
+    has_min_max_ = true;
+    return;
+  }
+
+  Compare<T> compare(descr_);
+  Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get());
+  Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get());
+}
+
+template <typename DType>
+std::string TypedRowGroupStatistics<DType>::EncodeMin() {
+  std::string s;
+  if (HasMinMax()) this->PlainEncode(min_, &s);
+  return s;
+}
+
+template <typename DType>
+std::string TypedRowGroupStatistics<DType>::EncodeMax() {
+  std::string s;
+  if (HasMinMax()) this->PlainEncode(max_, &s);
+  return s;
+}
+
+template <typename DType>
+EncodedStatistics TypedRowGroupStatistics<DType>::Encode() {
+  EncodedStatistics s;
+  if (HasMinMax()) {
+    s.set_min(this->EncodeMin());
+    s.set_max(this->EncodeMax());
+  }
+  s.set_null_count(this->null_count());
+  return s;
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::PlainEncode(const T& src, std::string* dst) {
+  PlainEncoder<DType> encoder(descr(), pool_);
+  encoder.Put(&src, 1);
+  auto buffer = encoder.FlushValues();
+  auto ptr = reinterpret_cast<const char*>(buffer->data());
+  dst->assign(ptr, buffer->size());
+}
+
+template <typename DType>
+void TypedRowGroupStatistics<DType>::PlainDecode(const std::string& src, T* dst) {
+  PlainDecoder<DType> decoder(descr());
+  decoder.SetData(
+      1, reinterpret_cast<const uint8_t*>(src.c_str()), static_cast<int>(src.size()));
+  decoder.Decode(dst, 1);
+}
+
+template <>
+void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst) {
+  dst->assign(reinterpret_cast<const char*>(src.ptr), src.len);
+}
+
+template <>
+void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst) {
+  dst->len = static_cast<uint32_t>(src.size());
+  dst->ptr = reinterpret_cast<const uint8_t*>(src.c_str());
+}
+
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<BooleanType>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int32Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int64Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int96Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FloatType>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<DoubleType>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<ByteArrayType>;
+template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FLBAType>;
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/statistics.h b/src/parquet/statistics.h
new file mode 100644
index 0000000..c6a2487
--- /dev/null
+++ b/src/parquet/statistics.h
@@ -0,0 +1,234 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_STATISTICS_H
+#define PARQUET_COLUMN_STATISTICS_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+class PARQUET_EXPORT EncodedStatistics {
+  std::shared_ptr<std::string> max_, min_;
+
+ public:
+  EncodedStatistics()
+      : max_(std::make_shared<std::string>()), min_(std::make_shared<std::string>()) {}
+
+  const std::string& max() const { return *max_; }
+  const std::string& min() const { return *min_; }
+
+  int64_t null_count = 0;
+  int64_t distinct_count = 0;
+
+  bool has_min = false;
+  bool has_max = false;
+  bool has_null_count = false;
+  bool has_distinct_count = false;
+
+  inline bool is_set() const {
+    return has_min || has_max || has_null_count || has_distinct_count;
+  }
+
+  inline EncodedStatistics& set_max(const std::string& value) {
+    *max_ = value;
+    has_max = true;
+    return *this;
+  }
+
+  inline EncodedStatistics& set_min(const std::string& value) {
+    *min_ = value;
+    has_min = true;
+    return *this;
+  }
+
+  inline EncodedStatistics& set_null_count(int64_t value) {
+    null_count = value;
+    has_null_count = true;
+    return *this;
+  }
+
+  inline EncodedStatistics& set_distinct_count(int64_t value) {
+    distinct_count = value;
+    has_distinct_count = true;
+    return *this;
+  }
+};
+
+template <typename DType>
+class PARQUET_EXPORT TypedRowGroupStatistics;
+
+class PARQUET_EXPORT RowGroupStatistics
+    : public std::enable_shared_from_this<RowGroupStatistics> {
+ public:
+  int64_t null_count() const { return statistics_.null_count; }
+  int64_t distinct_count() const { return statistics_.distinct_count; }
+  int64_t num_values() const { return num_values_; }
+
+  virtual bool HasMinMax() const = 0;
+  virtual void Reset() = 0;
+
+  // Plain-encoded minimum value
+  virtual std::string EncodeMin() = 0;
+
+  // Plain-encoded maximum value
+  virtual std::string EncodeMax() = 0;
+
+  virtual EncodedStatistics Encode() = 0;
+
+  virtual ~RowGroupStatistics() {}
+
+  Type::type physical_type() const { return descr_->physical_type(); }
+
+ protected:
+  const ColumnDescriptor* descr() const { return descr_; }
+  void SetDescr(const ColumnDescriptor* schema) { descr_ = schema; }
+
+  void IncrementNullCount(int64_t n) { statistics_.null_count += n; }
+
+  void IncrementNumValues(int64_t n) { num_values_ += n; }
+
+  void IncrementDistinctCount(int64_t n) { statistics_.distinct_count += n; }
+
+  void MergeCounts(const RowGroupStatistics& other) {
+    this->statistics_.null_count += other.statistics_.null_count;
+    this->statistics_.distinct_count += other.statistics_.distinct_count;
+    this->num_values_ += other.num_values_;
+  }
+
+  void ResetCounts() {
+    this->statistics_.null_count = 0;
+    this->statistics_.distinct_count = 0;
+    this->num_values_ = 0;
+  }
+
+  const ColumnDescriptor* descr_ = nullptr;
+  int64_t num_values_ = 0;
+  EncodedStatistics statistics_;
+};
+
+template <typename DType>
+class TypedRowGroupStatistics : public RowGroupStatistics {
+ public:
+  using T = typename DType::c_type;
+
+  TypedRowGroupStatistics(const ColumnDescriptor* schema,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  TypedRowGroupStatistics(const T& min, const T& max, int64_t num_values,
+      int64_t null_count, int64_t distinct_count);
+
+  TypedRowGroupStatistics(const ColumnDescriptor* schema, const std::string& encoded_min,
+      const std::string& encoded_max, int64_t num_values, int64_t null_count,
+      int64_t distinct_count, bool has_min_max,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  bool HasMinMax() const override;
+  void Reset() override;
+  void Merge(const TypedRowGroupStatistics<DType>& other);
+
+  void Update(const T* values, int64_t num_not_null, int64_t num_null);
+  void UpdateSpaced(const T* values, const uint8_t* valid_bits, int64_t valid_bits_spaced,
+      int64_t num_not_null, int64_t num_null);
+
+  const T& min() const;
+  const T& max() const;
+
+  std::string EncodeMin() override;
+  std::string EncodeMax() override;
+  EncodedStatistics Encode() override;
+
+ private:
+  bool has_min_max_ = false;
+  T min_;
+  T max_;
+  ::arrow::MemoryPool* pool_;
+
+  void PlainEncode(const T& src, std::string* dst);
+  void PlainDecode(const std::string& src, T* dst);
+  void Copy(const T& src, T* dst, PoolBuffer* buffer);
+
+  std::shared_ptr<PoolBuffer> min_buffer_, max_buffer_;
+};
+
+template <typename DType>
+inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, PoolBuffer*) {
+  *dst = src;
+}
+
+template <>
+inline void TypedRowGroupStatistics<FLBAType>::Copy(
+    const FLBA& src, FLBA* dst, PoolBuffer* buffer) {
+  if (dst->ptr == src.ptr) return;
+  uint32_t len = descr_->type_length();
+  PARQUET_THROW_NOT_OK(buffer->Resize(len, false));
+  std::memcpy(buffer->mutable_data(), src.ptr, len);
+  *dst = FLBA(buffer->data());
+}
+
+template <>
+inline void TypedRowGroupStatistics<ByteArrayType>::Copy(
+    const ByteArray& src, ByteArray* dst, PoolBuffer* buffer) {
+  if (dst->ptr == src.ptr) return;
+  PARQUET_THROW_NOT_OK(buffer->Resize(src.len, false));
+  std::memcpy(buffer->mutable_data(), src.ptr, src.len);
+  *dst = ByteArray(src.len, buffer->data());
+}
+
+template <>
+void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst);
+
+template <>
+void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst);
+
+typedef TypedRowGroupStatistics<BooleanType> BoolStatistics;
+typedef TypedRowGroupStatistics<Int32Type> Int32Statistics;
+typedef TypedRowGroupStatistics<Int64Type> Int64Statistics;
+typedef TypedRowGroupStatistics<Int96Type> Int96Statistics;
+typedef TypedRowGroupStatistics<FloatType> FloatStatistics;
+typedef TypedRowGroupStatistics<DoubleType> DoubleStatistics;
+typedef TypedRowGroupStatistics<ByteArrayType> ByteArrayStatistics;
+typedef TypedRowGroupStatistics<FLBAType> FLBAStatistics;
+
+#if defined(__GNUC__) && !defined(__clang__)
+#pragma GCC diagnostic push
+#pragma GCC diagnostic ignored "-Wattributes"
+#endif
+
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<BooleanType>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int32Type>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int64Type>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int96Type>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<FloatType>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<DoubleType>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<ByteArrayType>;
+PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<FLBAType>;
+
+#if defined(__GNUC__) && !defined(__clang__)
+#pragma GCC diagnostic pop
+#endif
+
+}  // namespace parquet
+
+#endif  // PARQUET_COLUMN_STATISTICS_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/test-specialization.h
----------------------------------------------------------------------
diff --git a/src/parquet/test-specialization.h b/src/parquet/test-specialization.h
new file mode 100644
index 0000000..6bd1dee
--- /dev/null
+++ b/src/parquet/test-specialization.h
@@ -0,0 +1,172 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_TEST_SPECIALIZATION_H
+#define PARQUET_COLUMN_TEST_SPECIALIZATION_H
+
+#include <algorithm>
+#include <limits>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "parquet/test-util.h"
+
+namespace parquet {
+
+namespace test {
+
+template <>
+void InitValues<bool>(int num_values, vector<bool>& values, vector<uint8_t>& buffer) {
+  values = flip_coins(num_values, 0);
+}
+
+template <>
+void InitValues<ByteArray>(
+    int num_values, vector<ByteArray>& values, vector<uint8_t>& buffer) {
+  int max_byte_array_len = 12;
+  int num_bytes = max_byte_array_len + sizeof(uint32_t);
+  size_t nbytes = num_values * num_bytes;
+  buffer.resize(nbytes);
+  random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len);
+}
+
+template <>
+void InitValues<FLBA>(int num_values, vector<FLBA>& values, vector<uint8_t>& buffer) {
+  size_t nbytes = num_values * FLBA_LENGTH;
+  buffer.resize(nbytes);
+  random_fixed_byte_array(num_values, 0, buffer.data(), FLBA_LENGTH, values.data());
+}
+
+template <>
+void InitValues<Int96>(int num_values, vector<Int96>& values, vector<uint8_t>& buffer) {
+  random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(),
+      std::numeric_limits<int32_t>::max(), values.data());
+}
+
+inline std::string TestColumnName(int i) {
+  std::stringstream col_name;
+  col_name << "column_" << i;
+  return col_name.str();
+}
+
+// This class lives here because of its dependency on the InitValues specializations.
+template <typename TestType>
+class PrimitiveTypedTest : public ::testing::Test {
+ public:
+  typedef typename TestType::c_type T;
+
+  void SetUpSchema(Repetition::type repetition, int num_columns = 1) {
+    std::vector<schema::NodePtr> fields;
+
+    for (int i = 0; i < num_columns; ++i) {
+      std::string name = TestColumnName(i);
+      fields.push_back(schema::PrimitiveNode::Make(
+          name, repetition, TestType::type_num, LogicalType::NONE, FLBA_LENGTH));
+    }
+    node_ = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
+    schema_.Init(node_);
+  }
+
+  void GenerateData(int64_t num_values);
+  void SetupValuesOut(int64_t num_values);
+  void SyncValuesOut();
+
+ protected:
+  schema::NodePtr node_;
+  SchemaDescriptor schema_;
+
+  // Input buffers
+  std::vector<T> values_;
+
+  std::vector<int16_t> def_levels_;
+
+  std::vector<uint8_t> buffer_;
+  // Pointer to the values, needed as we cannot use vector<bool>::data()
+  T* values_ptr_;
+  std::vector<uint8_t> bool_buffer_;
+
+  // Output buffers
+  std::vector<T> values_out_;
+  std::vector<uint8_t> bool_buffer_out_;
+  T* values_out_ptr_;
+};
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::SyncValuesOut() {}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::SyncValuesOut() {
+  std::vector<uint8_t>::const_iterator source_iterator = bool_buffer_out_.begin();
+  std::vector<T>::iterator destination_iterator = values_out_.begin();
+  while (source_iterator != bool_buffer_out_.end()) {
+    *destination_iterator++ = *source_iterator++ != 0;
+  }
+}
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::SetupValuesOut(int64_t num_values) {
+  values_out_.clear();
+  values_out_.resize(num_values);
+  values_out_ptr_ = values_out_.data();
+}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::SetupValuesOut(int64_t num_values) {
+  values_out_.clear();
+  values_out_.resize(num_values);
+
+  bool_buffer_out_.clear();
+  bool_buffer_out_.resize(num_values);
+  // Write once to all values so we can copy it without getting Valgrind errors
+  // about uninitialised values.
+  std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true);
+  values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data());
+}
+
+template <typename TestType>
+void PrimitiveTypedTest<TestType>::GenerateData(int64_t num_values) {
+  def_levels_.resize(num_values);
+  values_.resize(num_values);
+
+  InitValues<T>(static_cast<int>(num_values), values_, buffer_);
+  values_ptr_ = values_.data();
+
+  std::fill(def_levels_.begin(), def_levels_.end(), 1);
+}
+
+template <>
+void PrimitiveTypedTest<BooleanType>::GenerateData(int64_t num_values) {
+  def_levels_.resize(num_values);
+  values_.resize(num_values);
+
+  InitValues<T>(static_cast<int>(num_values), values_, buffer_);
+  bool_buffer_.resize(num_values);
+  std::copy(values_.begin(), values_.end(), bool_buffer_.begin());
+  values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data());
+
+  std::fill(def_levels_.begin(), def_levels_.end(), 1);
+}
+}  // namespace test
+
+}  // namespace parquet
+
+#endif  // PARQUET_COLUMN_TEST_SPECIALIZATION_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
new file mode 100644
index 0000000..8657a7f
--- /dev/null
+++ b/src/parquet/test-util.h
@@ -0,0 +1,430 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_TEST_UTIL_H
+#define PARQUET_COLUMN_TEST_UTIL_H
+
+#include <algorithm>
+#include <limits>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/column_writer.h"
+#include "parquet/encoding-internal.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/test-common.h"
+
+using std::vector;
+using std::shared_ptr;
+
+namespace parquet {
+
+static int FLBA_LENGTH = 12;
+
+bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
+  return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
+}
+
+namespace test {
+
+template <typename T>
+static void InitValues(int num_values, vector<T>& values, vector<uint8_t>& buffer) {
+  random_numbers(num_values, 0, std::numeric_limits<T>::min(),
+      std::numeric_limits<T>::max(), values.data());
+}
+
+template <typename T>
+static void InitDictValues(
+    int num_values, int num_dicts, vector<T>& values, vector<uint8_t>& buffer) {
+  int repeat_factor = num_values / num_dicts;
+  InitValues<T>(num_dicts, values, buffer);
+  // add some repeated values
+  for (int j = 1; j < repeat_factor; ++j) {
+    for (int i = 0; i < num_dicts; ++i) {
+      std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T));
+    }
+  }
+  // computed only dict_per_page * repeat_factor - 1 values < num_values
+  // compute remaining
+  for (int i = num_dicts * repeat_factor; i < num_values; ++i) {
+    std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T));
+  }
+}
+
+class MockPageReader : public PageReader {
+ public:
+  explicit MockPageReader(const vector<shared_ptr<Page>>& pages)
+      : pages_(pages), page_index_(0) {}
+
+  // Implement the PageReader interface
+  virtual shared_ptr<Page> NextPage() {
+    if (page_index_ == static_cast<int>(pages_.size())) {
+      // EOS to consumer
+      return shared_ptr<Page>(nullptr);
+    }
+    return pages_[page_index_++];
+  }
+
+ private:
+  vector<shared_ptr<Page>> pages_;
+  int page_index_;
+};
+
+// TODO(wesm): this is only used for testing for now. Refactor to form part of
+// primary file write path
+template <typename Type>
+class DataPageBuilder {
+ public:
+  typedef typename Type::c_type T;
+
+  // This class writes data and metadata to the passed inputs
+  explicit DataPageBuilder(InMemoryOutputStream* sink)
+      : sink_(sink),
+        num_values_(0),
+        encoding_(Encoding::PLAIN),
+        definition_level_encoding_(Encoding::RLE),
+        repetition_level_encoding_(Encoding::RLE),
+        have_def_levels_(false),
+        have_rep_levels_(false),
+        have_values_(false) {}
+
+  void AppendDefLevels(const vector<int16_t>& levels, int16_t max_level,
+      Encoding::type encoding = Encoding::RLE) {
+    AppendLevels(levels, max_level, encoding);
+
+    num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
+    definition_level_encoding_ = encoding;
+    have_def_levels_ = true;
+  }
+
+  void AppendRepLevels(const vector<int16_t>& levels, int16_t max_level,
+      Encoding::type encoding = Encoding::RLE) {
+    AppendLevels(levels, max_level, encoding);
+
+    num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
+    repetition_level_encoding_ = encoding;
+    have_rep_levels_ = true;
+  }
+
+  void AppendValues(const ColumnDescriptor* d, const vector<T>& values,
+      Encoding::type encoding = Encoding::PLAIN) {
+    PlainEncoder<Type> encoder(d);
+    encoder.Put(&values[0], static_cast<int>(values.size()));
+    std::shared_ptr<Buffer> values_sink = encoder.FlushValues();
+    sink_->Write(values_sink->data(), values_sink->size());
+
+    num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
+    encoding_ = encoding;
+    have_values_ = true;
+  }
+
+  int32_t num_values() const { return num_values_; }
+
+  Encoding::type encoding() const { return encoding_; }
+
+  Encoding::type rep_level_encoding() const { return repetition_level_encoding_; }
+
+  Encoding::type def_level_encoding() const { return definition_level_encoding_; }
+
+ private:
+  InMemoryOutputStream* sink_;
+
+  int32_t num_values_;
+  Encoding::type encoding_;
+  Encoding::type definition_level_encoding_;
+  Encoding::type repetition_level_encoding_;
+
+  bool have_def_levels_;
+  bool have_rep_levels_;
+  bool have_values_;
+
+  // Used internally for both repetition and definition levels
+  void AppendLevels(
+      const vector<int16_t>& levels, int16_t max_level, Encoding::type encoding) {
+    if (encoding != Encoding::RLE) {
+      ParquetException::NYI("only rle encoding currently implemented");
+    }
+
+    // TODO: compute a more precise maximum size for the encoded levels
+    vector<uint8_t> encode_buffer(levels.size() * 2);
+
+    // We encode into separate memory from the output stream because the
+    // RLE-encoded bytes have to be preceded in the stream by their absolute
+    // size.
+    LevelEncoder encoder;
+    encoder.Init(encoding, max_level, static_cast<int>(levels.size()),
+        encode_buffer.data(), static_cast<int>(encode_buffer.size()));
+
+    encoder.Encode(static_cast<int>(levels.size()), levels.data());
+
+    int32_t rle_bytes = encoder.len();
+    sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t));
+    sink_->Write(encode_buffer.data(), rle_bytes);
+  }
+};
+
+template <>
+void DataPageBuilder<BooleanType>::AppendValues(
+    const ColumnDescriptor* d, const vector<bool>& values, Encoding::type encoding) {
+  if (encoding != Encoding::PLAIN) {
+    ParquetException::NYI("only plain encoding currently implemented");
+  }
+  PlainEncoder<BooleanType> encoder(d);
+  encoder.Put(values, static_cast<int>(values.size()));
+  std::shared_ptr<Buffer> buffer = encoder.FlushValues();
+  sink_->Write(buffer->data(), buffer->size());
+
+  num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
+  encoding_ = encoding;
+  have_values_ = true;
+}
+
+template <typename Type>
+static shared_ptr<DataPage> MakeDataPage(const ColumnDescriptor* d,
+    const vector<typename Type::c_type>& values, int num_vals, Encoding::type encoding,
+    const uint8_t* indices, int indices_size, const vector<int16_t>& def_levels,
+    int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level) {
+  int num_values = 0;
+
+  InMemoryOutputStream page_stream;
+  test::DataPageBuilder<Type> page_builder(&page_stream);
+
+  if (!rep_levels.empty()) { page_builder.AppendRepLevels(rep_levels, max_rep_level); }
+  if (!def_levels.empty()) { page_builder.AppendDefLevels(def_levels, max_def_level); }
+
+  if (encoding == Encoding::PLAIN) {
+    page_builder.AppendValues(d, values, encoding);
+    num_values = page_builder.num_values();
+  } else {  // DICTIONARY PAGES
+    page_stream.Write(indices, indices_size);
+    num_values = std::max(page_builder.num_values(), num_vals);
+  }
+
+  auto buffer = page_stream.GetBuffer();
+
+  return std::make_shared<DataPage>(buffer, num_values, encoding,
+      page_builder.def_level_encoding(), page_builder.rep_level_encoding());
+}
+
+template <typename TYPE>
+class DictionaryPageBuilder {
+ public:
+  typedef typename TYPE::c_type TC;
+  static constexpr int TN = TYPE::type_num;
+
+  // This class writes data and metadata to the passed inputs
+  explicit DictionaryPageBuilder(const ColumnDescriptor* d)
+      : num_dict_values_(0), have_values_(false) {
+    encoder_.reset(new DictEncoder<TYPE>(d, &pool_));
+  }
+
+  ~DictionaryPageBuilder() { pool_.FreeAll(); }
+
+  shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
+    int num_values = static_cast<int>(values.size());
+    // Dictionary encoding
+    encoder_->Put(values.data(), num_values);
+    num_dict_values_ = encoder_->num_entries();
+    have_values_ = true;
+    return encoder_->FlushValues();
+  }
+
+  shared_ptr<Buffer> WriteDict() {
+    std::shared_ptr<PoolBuffer> dict_buffer =
+        AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size());
+    encoder_->WriteDict(dict_buffer->mutable_data());
+    return dict_buffer;
+  }
+
+  int32_t num_values() const { return num_dict_values_; }
+
+ private:
+  ChunkedAllocator pool_;
+  shared_ptr<DictEncoder<TYPE>> encoder_;
+  int32_t num_dict_values_;
+  bool have_values_;
+};
+
+template <>
+DictionaryPageBuilder<BooleanType>::DictionaryPageBuilder(const ColumnDescriptor* d) {
+  ParquetException::NYI("only plain encoding currently implemented for boolean");
+}
+
+template <>
+shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::WriteDict() {
+  ParquetException::NYI("only plain encoding currently implemented for boolean");
+  return nullptr;
+}
+
+template <>
+shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::AppendValues(
+    const vector<TC>& values) {
+  ParquetException::NYI("only plain encoding currently implemented for boolean");
+  return nullptr;
+}
+
+template <typename Type>
+static shared_ptr<DictionaryPage> MakeDictPage(const ColumnDescriptor* d,
+    const vector<typename Type::c_type>& values, const vector<int>& values_per_page,
+    Encoding::type encoding, vector<shared_ptr<Buffer>>& rle_indices) {
+  InMemoryOutputStream page_stream;
+  test::DictionaryPageBuilder<Type> page_builder(d);
+  int num_pages = static_cast<int>(values_per_page.size());
+  int value_start = 0;
+
+  for (int i = 0; i < num_pages; i++) {
+    rle_indices.push_back(page_builder.AppendValues(
+        slice(values, value_start, value_start + values_per_page[i])));
+    value_start += values_per_page[i];
+  }
+
+  auto buffer = page_builder.WriteDict();
+
+  return std::make_shared<DictionaryPage>(
+      buffer, page_builder.num_values(), Encoding::PLAIN);
+}
+
+// Given def/rep levels and values create multiple dict pages
+template <typename Type>
+static void PaginateDict(const ColumnDescriptor* d,
+    const vector<typename Type::c_type>& values, const vector<int16_t>& def_levels,
+    int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level,
+    int num_levels_per_page, const vector<int>& values_per_page,
+    vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::RLE_DICTIONARY) {
+  int num_pages = static_cast<int>(values_per_page.size());
+  vector<shared_ptr<Buffer>> rle_indices;
+  shared_ptr<DictionaryPage> dict_page =
+      MakeDictPage<Type>(d, values, values_per_page, encoding, rle_indices);
+  pages.push_back(dict_page);
+  int def_level_start = 0;
+  int def_level_end = 0;
+  int rep_level_start = 0;
+  int rep_level_end = 0;
+  for (int i = 0; i < num_pages; i++) {
+    if (max_def_level > 0) {
+      def_level_start = i * num_levels_per_page;
+      def_level_end = (i + 1) * num_levels_per_page;
+    }
+    if (max_rep_level > 0) {
+      rep_level_start = i * num_levels_per_page;
+      rep_level_end = (i + 1) * num_levels_per_page;
+    }
+    shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(d, {}, values_per_page[i],
+        encoding, rle_indices[i]->data(), static_cast<int>(rle_indices[i]->size()),
+        slice(def_levels, def_level_start, def_level_end), max_def_level,
+        slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
+    pages.push_back(data_page);
+  }
+}
+
+// Given def/rep levels and values create multiple plain pages
+template <typename Type>
+static void PaginatePlain(const ColumnDescriptor* d,
+    const vector<typename Type::c_type>& values, const vector<int16_t>& def_levels,
+    int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level,
+    int num_levels_per_page, const vector<int>& values_per_page,
+    vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::PLAIN) {
+  int num_pages = static_cast<int>(values_per_page.size());
+  int def_level_start = 0;
+  int def_level_end = 0;
+  int rep_level_start = 0;
+  int rep_level_end = 0;
+  int value_start = 0;
+  for (int i = 0; i < num_pages; i++) {
+    if (max_def_level > 0) {
+      def_level_start = i * num_levels_per_page;
+      def_level_end = (i + 1) * num_levels_per_page;
+    }
+    if (max_rep_level > 0) {
+      rep_level_start = i * num_levels_per_page;
+      rep_level_end = (i + 1) * num_levels_per_page;
+    }
+    shared_ptr<DataPage> page = MakeDataPage<Type>(d,
+        slice(values, value_start, value_start + values_per_page[i]), values_per_page[i],
+        encoding, NULL, 0, slice(def_levels, def_level_start, def_level_end),
+        max_def_level, slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
+    pages.push_back(page);
+    value_start += values_per_page[i];
+  }
+}
+
+// Generates pages from randomly generated data
+template <typename Type>
+static int MakePages(const ColumnDescriptor* d, int num_pages, int levels_per_page,
+    vector<int16_t>& def_levels, vector<int16_t>& rep_levels,
+    vector<typename Type::c_type>& values, vector<uint8_t>& buffer,
+    vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::PLAIN) {
+  int num_levels = levels_per_page * num_pages;
+  int num_values = 0;
+  uint32_t seed = 0;
+  int16_t zero = 0;
+  int16_t max_def_level = d->max_definition_level();
+  int16_t max_rep_level = d->max_repetition_level();
+  vector<int> values_per_page(num_pages, levels_per_page);
+  // Create definition levels
+  if (max_def_level > 0) {
+    def_levels.resize(num_levels);
+    random_numbers(num_levels, seed, zero, max_def_level, def_levels.data());
+    for (int p = 0; p < num_pages; p++) {
+      int num_values_per_page = 0;
+      for (int i = 0; i < levels_per_page; i++) {
+        if (def_levels[i + p * levels_per_page] == max_def_level) {
+          num_values_per_page++;
+          num_values++;
+        }
+      }
+      values_per_page[p] = num_values_per_page;
+    }
+  } else {
+    num_values = num_levels;
+  }
+  // Create repitition levels
+  if (max_rep_level > 0) {
+    rep_levels.resize(num_levels);
+    random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data());
+  }
+  // Create values
+  values.resize(num_values);
+  if (encoding == Encoding::PLAIN) {
+    InitValues<typename Type::c_type>(num_values, values, buffer);
+    PaginatePlain<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
+        levels_per_page, values_per_page, pages);
+  } else if (encoding == Encoding::RLE_DICTIONARY ||
+             encoding == Encoding::PLAIN_DICTIONARY) {
+    // Calls InitValues and repeats the data
+    InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer);
+    PaginateDict<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
+        levels_per_page, values_per_page, pages);
+  }
+
+  return num_values;
+}
+
+}  // namespace test
+
+}  // namespace parquet
+
+#endif  // PARQUET_COLUMN_TEST_UTIL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/util/memory.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 4f780c4..2e7eb0f 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -46,20 +46,16 @@ static inline std::unique_ptr<::arrow::Codec> GetCodecFromArrow(Compression::typ
     case Compression::UNCOMPRESSED:
       break;
     case Compression::SNAPPY:
-      PARQUET_THROW_NOT_OK(
-          ::arrow::Codec::Create(::arrow::Compression::SNAPPY, &result));
+      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::SNAPPY, &result));
       break;
     case Compression::GZIP:
-      PARQUET_THROW_NOT_OK(
-          ::arrow::Codec::Create(::arrow::Compression::GZIP, &result));
+      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::GZIP, &result));
       break;
     case Compression::LZO:
-      PARQUET_THROW_NOT_OK(
-          ::arrow::Codec::Create(::arrow::Compression::LZO, &result));
+      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::LZO, &result));
       break;
     case Compression::BROTLI:
-      PARQUET_THROW_NOT_OK(
-          ::arrow::Codec::Create(::arrow::Compression::BROTLI, &result));
+      PARQUET_THROW_NOT_OK(::arrow::Codec::Create(::arrow::Compression::BROTLI, &result));
       break;
     default:
       break;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/util/schema-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h
index 618d21e..e199c21 100644
--- a/src/parquet/util/schema-util.h
+++ b/src/parquet/util/schema-util.h
@@ -19,8 +19,8 @@
 #define PARQUET_SCHEMA_UTIL_H
 
 #include <string>
-#include <vector>
 #include <unordered_set>
+#include <vector>
 
 #include "parquet/exception.h"
 #include "parquet/schema.h"
@@ -43,8 +43,7 @@ inline bool str_endswith_tuple(const std::string& str) {
 //   If the name is array or ends in _tuple, this should be a list of struct
 //   even for single child elements.
 inline bool HasStructListName(const GroupNode& node) {
-  return (node.name() == "array" ||
-          str_endswith_tuple(node.name()));
+  return (node.name() == "array" || str_endswith_tuple(node.name()));
 }
 
 // TODO(itaiin): This aux. function is to be deleted once repeated structs are supported
@@ -53,8 +52,8 @@ inline bool IsSimpleStruct(const NodePtr& node) {
   if (node->is_repeated()) return false;
   if (node->logical_type() == LogicalType::LIST) return false;
   // Special case mentioned in the format spec:
-    //   If the name is array or ends in _tuple, this should be a list of struct
-    //   even for single child elements.
+  //   If the name is array or ends in _tuple, this should be a list of struct
+  //   even for single child elements.
   auto group = static_cast<const GroupNode*>(node.get());
   if (group->field_count() == 1 && HasStructListName(*group)) return false;
 
@@ -71,9 +70,7 @@ inline bool ColumnIndicesToFieldIndices(const SchemaDescriptor& descr,
   for (auto& column_idx : column_indices) {
     auto field_node = descr.GetColumnRoot(column_idx);
     auto field_idx = group->FieldIndex(field_node->name());
-    if (field_idx < 0) {
-      return false;
-    }
+    if (field_idx < 0) { return false; }
     auto insertion = already_added.insert(field_idx);
     if (insertion.second) { out->push_back(field_idx); }
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/util/visibility.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/visibility.h b/src/parquet/util/visibility.h
index 1601335..984fac2 100644
--- a/src/parquet/util/visibility.h
+++ b/src/parquet/util/visibility.h
@@ -21,7 +21,8 @@
 #if defined(_WIN32) || defined(__CYGWIN__)
 #ifdef _MSC_VER
 #pragma warning(push)
-// Disable warning for STL types usage in DLL interface https://web.archive.org/web/20130317015847/http://connect.microsoft.com/VisualStudio/feedback/details/696593/vc-10-vs-2010-basic-string-exports
+// Disable warning for STL types usage in DLL interface
+// https://web.archive.org/web/20130317015847/http://connect.microsoft.com/VisualStudio/feedback/details/696593/vc-10-vs-2010-basic-string-exports
 #pragma warning(disable : 4275 4251)
 // Disable diamond inheritance warnings
 #pragma warning(disable : 4250)


Mime
View raw message