parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject [4/6] parquet-cpp git commit: PARQUET-858: Flatten column directory, minor code consolidation
Date Mon, 26 Jun 2017 07:05:27 GMT
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
deleted file mode 100644
index a9b83c1..0000000
--- a/src/parquet/column/scanner.h
+++ /dev/null
@@ -1,232 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_SCANNER_H
-#define PARQUET_COLUMN_SCANNER_H
-
-#include <cstdint>
-#include <memory>
-#include <ostream>
-#include <stdio.h>
-#include <string>
-#include <vector>
-
-#include "parquet/column/reader.h"
-#include "parquet/exception.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
-
-class PARQUET_EXPORT Scanner {
- public:
-  explicit Scanner(std::shared_ptr<ColumnReader> reader,
-      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
-      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
-      : batch_size_(batch_size),
-        level_offset_(0),
-        levels_buffered_(0),
-        value_buffer_(std::make_shared<PoolBuffer>(pool)),
-        value_offset_(0),
-        values_buffered_(0),
-        reader_(reader) {
-    def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0);
-    rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0);
-  }
-
-  virtual ~Scanner() {}
-
-  static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
-      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
-      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
-  virtual void PrintNext(std::ostream& out, int width) = 0;
-
-  bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); }
-
-  const ColumnDescriptor* descr() const { return reader_->descr(); }
-
-  int64_t batch_size() const { return batch_size_; }
-
-  void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; }
-
- protected:
-  int64_t batch_size_;
-
-  std::vector<int16_t> def_levels_;
-  std::vector<int16_t> rep_levels_;
-  int level_offset_;
-  int levels_buffered_;
-
-  std::shared_ptr<PoolBuffer> value_buffer_;
-  int value_offset_;
-  int64_t values_buffered_;
-
- private:
-  std::shared_ptr<ColumnReader> reader_;
-};
-
-template <typename DType>
-class PARQUET_EXPORT TypedScanner : public Scanner {
- public:
-  typedef typename DType::c_type T;
-
-  explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
-      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
-      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
-      : Scanner(reader, batch_size, pool) {
-    typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
-    int value_byte_size = type_traits<DType::type_num>::value_byte_size;
-    PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
-    values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
-  }
-
-  virtual ~TypedScanner() {}
-
-  bool NextLevels(int16_t* def_level, int16_t* rep_level) {
-    if (level_offset_ == levels_buffered_) {
-      levels_buffered_ =
-          static_cast<int>(typed_reader_->ReadBatch(static_cast<int>(batch_size_),
-              def_levels_.data(), rep_levels_.data(), values_, &values_buffered_));
-
-      value_offset_ = 0;
-      level_offset_ = 0;
-      if (!levels_buffered_) { return false; }
-    }
-    *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
-    *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
-    level_offset_++;
-    return true;
-  }
-
-  bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
-    if (level_offset_ == levels_buffered_) {
-      if (!HasNext()) {
-        // Out of data pages
-        return false;
-      }
-    }
-
-    NextLevels(def_level, rep_level);
-    *is_null = *def_level < descr()->max_definition_level();
-
-    if (*is_null) { return true; }
-
-    if (value_offset_ == values_buffered_) {
-      throw ParquetException("Value was non-null, but has not been buffered");
-    }
-    *val = values_[value_offset_++];
-    return true;
-  }
-
-  // Returns true if there is a next value
-  bool NextValue(T* val, bool* is_null) {
-    if (level_offset_ == levels_buffered_) {
-      if (!HasNext()) {
-        // Out of data pages
-        return false;
-      }
-    }
-
-    // Out of values
-    int16_t def_level = -1;
-    int16_t rep_level = -1;
-    NextLevels(&def_level, &rep_level);
-    *is_null = def_level < descr()->max_definition_level();
-
-    if (*is_null) { return true; }
-
-    if (value_offset_ == values_buffered_) {
-      throw ParquetException("Value was non-null, but has not been buffered");
-    }
-    *val = values_[value_offset_++];
-    return true;
-  }
-
-  virtual void PrintNext(std::ostream& out, int width) {
-    T val;
-    bool is_null = false;
-    char buffer[25];
-
-    if (!NextValue(&val, &is_null)) { throw ParquetException("No more values buffered"); }
-
-    if (is_null) {
-      std::string null_fmt = format_fwf<ByteArrayType>(width);
-      snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL");
-    } else {
-      FormatValue(&val, buffer, sizeof(buffer), width);
-    }
-    out << buffer;
-  }
-
- private:
-  // The ownership of this object is expressed through the reader_ variable in the base
-  TypedColumnReader<DType>* typed_reader_;
-
-  inline void FormatValue(void* val, char* buffer, int bufsize, int width);
-
-  T* values_;
-};
-
-template <typename DType>
-inline void TypedScanner<DType>::FormatValue(
-    void* val, char* buffer, int bufsize, int width) {
-  std::string fmt = format_fwf<DType>(width);
-  snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
-}
-
-template <>
-inline void TypedScanner<Int96Type>::FormatValue(
-    void* val, char* buffer, int bufsize, int width) {
-  std::string fmt = format_fwf<Int96Type>(width);
-  std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
-  snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
-}
-
-template <>
-inline void TypedScanner<ByteArrayType>::FormatValue(
-    void* val, char* buffer, int bufsize, int width) {
-  std::string fmt = format_fwf<ByteArrayType>(width);
-  std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
-  snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
-}
-
-template <>
-inline void TypedScanner<FLBAType>::FormatValue(
-    void* val, char* buffer, int bufsize, int width) {
-  std::string fmt = format_fwf<FLBAType>(width);
-  std::string result = FixedLenByteArrayToString(
-      *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
-  snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
-}
-
-typedef TypedScanner<BooleanType> BoolScanner;
-typedef TypedScanner<Int32Type> Int32Scanner;
-typedef TypedScanner<Int64Type> Int64Scanner;
-typedef TypedScanner<Int96Type> Int96Scanner;
-typedef TypedScanner<FloatType> FloatScanner;
-typedef TypedScanner<DoubleType> DoubleScanner;
-typedef TypedScanner<ByteArrayType> ByteArrayScanner;
-typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
-
-}  // namespace parquet
-
-#endif  // PARQUET_COLUMN_SCANNER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc
deleted file mode 100644
index e656f81..0000000
--- a/src/parquet/column/statistics-test.cc
+++ /dev/null
@@ -1,358 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <algorithm>
-#include <array>
-#include <cstdint>
-#include <cstring>
-#include <memory>
-#include <vector>
-
-#include "parquet/column/reader.h"
-#include "parquet/column/statistics.h"
-#include "parquet/column/test-specialization.h"
-#include "parquet/column/test-util.h"
-#include "parquet/column/writer.h"
-#include "parquet/file/reader.h"
-#include "parquet/file/writer.h"
-#include "parquet/schema.h"
-#include "parquet/thrift.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-
-using arrow::default_memory_pool;
-using arrow::MemoryPool;
-
-namespace parquet {
-
-using schema::NodePtr;
-using schema::PrimitiveNode;
-using schema::GroupNode;
-
-namespace test {
-
-template <typename TestType>
-class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> {
- public:
-  using T = typename TestType::c_type;
-  using TypedStats = TypedRowGroupStatistics<TestType>;
-
-  std::vector<T> GetDeepCopy(
-      const std::vector<T>&);  // allocates new memory for FLBA/ByteArray
-
-  T* GetValuesPointer(std::vector<T>&);
-  void DeepFree(std::vector<T>&);
-
-  void TestMinMaxEncode() {
-    this->GenerateData(1000);
-
-    TypedStats statistics1(this->schema_.Column(0));
-    statistics1.Update(this->values_ptr_, this->values_.size(), 0);
-    std::string encoded_min = statistics1.EncodeMin();
-    std::string encoded_max = statistics1.EncodeMax();
-
-    TypedStats statistics2(this->schema_.Column(0), encoded_min, encoded_max,
-        this->values_.size(), 0, 0, true);
-
-    TypedStats statistics3(this->schema_.Column(0));
-    std::vector<uint8_t> valid_bits(
-        BitUtil::RoundUpNumBytes(static_cast<uint32_t>(this->values_.size())) + 1, 255);
-    statistics3.UpdateSpaced(
-        this->values_ptr_, valid_bits.data(), 0, this->values_.size(), 0);
-    std::string encoded_min_spaced = statistics3.EncodeMin();
-    std::string encoded_max_spaced = statistics3.EncodeMax();
-
-    ASSERT_EQ(encoded_min, statistics2.EncodeMin());
-    ASSERT_EQ(encoded_max, statistics2.EncodeMax());
-    ASSERT_EQ(statistics1.min(), statistics2.min());
-    ASSERT_EQ(statistics1.max(), statistics2.max());
-    ASSERT_EQ(encoded_min_spaced, statistics2.EncodeMin());
-    ASSERT_EQ(encoded_max_spaced, statistics2.EncodeMax());
-    ASSERT_EQ(statistics3.min(), statistics2.min());
-    ASSERT_EQ(statistics3.max(), statistics2.max());
-  }
-
-  void TestReset() {
-    this->GenerateData(1000);
-
-    TypedStats statistics(this->schema_.Column(0));
-    statistics.Update(this->values_ptr_, this->values_.size(), 0);
-    ASSERT_EQ(this->values_.size(), statistics.num_values());
-
-    statistics.Reset();
-    ASSERT_EQ(0, statistics.null_count());
-    ASSERT_EQ(0, statistics.num_values());
-    ASSERT_EQ("", statistics.EncodeMin());
-    ASSERT_EQ("", statistics.EncodeMax());
-  }
-
-  void TestMerge() {
-    int num_null[2];
-    random_numbers(2, 42, 0, 100, num_null);
-
-    TypedStats statistics1(this->schema_.Column(0));
-    this->GenerateData(1000);
-    statistics1.Update(
-        this->values_ptr_, this->values_.size() - num_null[0], num_null[0]);
-
-    TypedStats statistics2(this->schema_.Column(0));
-    this->GenerateData(1000);
-    statistics2.Update(
-        this->values_ptr_, this->values_.size() - num_null[1], num_null[1]);
-
-    TypedStats total(this->schema_.Column(0));
-    total.Merge(statistics1);
-    total.Merge(statistics2);
-
-    ASSERT_EQ(num_null[0] + num_null[1], total.null_count());
-    ASSERT_EQ(this->values_.size() * 2 - num_null[0] - num_null[1], total.num_values());
-    ASSERT_EQ(total.min(), std::min(statistics1.min(), statistics2.min()));
-    ASSERT_EQ(total.max(), std::max(statistics1.max(), statistics2.max()));
-  }
-
-  void TestFullRoundtrip(int64_t num_values, int64_t null_count) {
-    this->GenerateData(num_values);
-
-    // compute statistics for the whole batch
-    TypedStats expected_stats(this->schema_.Column(0));
-    expected_stats.Update(this->values_ptr_, num_values - null_count, null_count);
-
-    auto sink = std::make_shared<InMemoryOutputStream>();
-    auto gnode = std::static_pointer_cast<GroupNode>(this->node_);
-    std::shared_ptr<WriterProperties> writer_properties =
-        WriterProperties::Builder().enable_statistics("column")->build();
-    auto file_writer = ParquetFileWriter::Open(sink, gnode, writer_properties);
-    auto row_group_writer = file_writer->AppendRowGroup(num_values);
-    auto column_writer =
-        static_cast<TypedColumnWriter<TestType>*>(row_group_writer->NextColumn());
-
-    // simulate the case when data comes from multiple buffers,
-    // in which case special care is necessary for FLBA/ByteArray types
-    for (int i = 0; i < 2; i++) {
-      int64_t batch_num_values = i ? num_values - num_values / 2 : num_values / 2;
-      int64_t batch_null_count = i ? null_count : 0;
-      DCHECK(null_count <= num_values);  // avoid too much headache
-      std::vector<int16_t> definition_levels(batch_null_count, 0);
-      definition_levels.insert(
-          definition_levels.end(), batch_num_values - batch_null_count, 1);
-      auto beg = this->values_.begin() + i * num_values / 2;
-      auto end = beg + batch_num_values;
-      std::vector<T> batch = GetDeepCopy(std::vector<T>(beg, end));
-      T* batch_values_ptr = GetValuesPointer(batch);
-      column_writer->WriteBatch(
-          batch_num_values, definition_levels.data(), nullptr, batch_values_ptr);
-      DeepFree(batch);
-    }
-    column_writer->Close();
-    row_group_writer->Close();
-    file_writer->Close();
-
-    auto buffer = sink->GetBuffer();
-    auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
-    auto file_reader = ParquetFileReader::Open(source);
-    auto rg_reader = file_reader->RowGroup(0);
-    auto column_chunk = rg_reader->metadata()->ColumnChunk(0);
-    if (!column_chunk->is_stats_set()) return;
-    std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics();
-    // check values after serialization + deserialization
-    ASSERT_EQ(null_count, stats->null_count());
-    ASSERT_EQ(num_values - null_count, stats->num_values());
-    ASSERT_EQ(expected_stats.EncodeMin(), stats->EncodeMin());
-    ASSERT_EQ(expected_stats.EncodeMax(), stats->EncodeMax());
-  }
-};
-
-template <typename TestType>
-typename TestType::c_type* TestRowGroupStatistics<TestType>::GetValuesPointer(
-    std::vector<typename TestType::c_type>& values) {
-  return values.data();
-}
-
-template <>
-bool* TestRowGroupStatistics<BooleanType>::GetValuesPointer(std::vector<bool>& values) {
-  static std::vector<uint8_t> bool_buffer;
-  bool_buffer.clear();
-  bool_buffer.resize(values.size());
-  std::copy(values.begin(), values.end(), bool_buffer.begin());
-  return reinterpret_cast<bool*>(bool_buffer.data());
-}
-
-template <typename TestType>
-typename std::vector<typename TestType::c_type>
-TestRowGroupStatistics<TestType>::GetDeepCopy(
-    const std::vector<typename TestType::c_type>& values) {
-  return values;
-}
-
-template <>
-std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy(
-    const std::vector<FLBA>& values) {
-  std::vector<FLBA> copy;
-  MemoryPool* pool = ::arrow::default_memory_pool();
-  for (const FLBA& flba : values) {
-    uint8_t* ptr;
-    PARQUET_THROW_NOT_OK(pool->Allocate(FLBA_LENGTH, &ptr));
-    memcpy(ptr, flba.ptr, FLBA_LENGTH);
-    copy.emplace_back(ptr);
-  }
-  return copy;
-}
-
-template <>
-std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy(
-    const std::vector<ByteArray>& values) {
-  std::vector<ByteArray> copy;
-  MemoryPool* pool = default_memory_pool();
-  for (const ByteArray& ba : values) {
-    uint8_t* ptr;
-    PARQUET_THROW_NOT_OK(pool->Allocate(ba.len, &ptr));
-    memcpy(ptr, ba.ptr, ba.len);
-    copy.emplace_back(ba.len, ptr);
-  }
-  return copy;
-}
-
-template <typename TestType>
-void TestRowGroupStatistics<TestType>::DeepFree(
-    std::vector<typename TestType::c_type>& values) {}
-
-template <>
-void TestRowGroupStatistics<FLBAType>::DeepFree(std::vector<FLBA>& values) {
-  MemoryPool* pool = default_memory_pool();
-  for (FLBA& flba : values) {
-    auto ptr = const_cast<uint8_t*>(flba.ptr);
-    memset(ptr, 0, FLBA_LENGTH);
-    pool->Free(ptr, FLBA_LENGTH);
-  }
-}
-
-template <>
-void TestRowGroupStatistics<ByteArrayType>::DeepFree(std::vector<ByteArray>& values) {
-  MemoryPool* pool = default_memory_pool();
-  for (ByteArray& ba : values) {
-    auto ptr = const_cast<uint8_t*>(ba.ptr);
-    memset(ptr, 0, ba.len);
-    pool->Free(ptr, ba.len);
-  }
-}
-
-template <>
-void TestRowGroupStatistics<ByteArrayType>::TestMinMaxEncode() {
-  this->GenerateData(1000);
-  // Test that we encode min max strings correctly
-  TypedRowGroupStatistics<ByteArrayType> statistics1(this->schema_.Column(0));
-  statistics1.Update(this->values_ptr_, this->values_.size(), 0);
-  std::string encoded_min = statistics1.EncodeMin();
-  std::string encoded_max = statistics1.EncodeMax();
-
-  // encoded is same as unencoded
-  ASSERT_EQ(encoded_min,
-      std::string((const char*)statistics1.min().ptr, statistics1.min().len));
-  ASSERT_EQ(encoded_max,
-      std::string((const char*)statistics1.max().ptr, statistics1.max().len));
-
-  TypedRowGroupStatistics<ByteArrayType> statistics2(this->schema_.Column(0), encoded_min,
-      encoded_max, this->values_.size(), 0, 0, true);
-
-  ASSERT_EQ(encoded_min, statistics2.EncodeMin());
-  ASSERT_EQ(encoded_max, statistics2.EncodeMax());
-  ASSERT_EQ(statistics1.min(), statistics2.min());
-  ASSERT_EQ(statistics1.max(), statistics2.max());
-}
-
-using TestTypes = ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    ByteArrayType, FLBAType, BooleanType>;
-
-TYPED_TEST_CASE(TestRowGroupStatistics, TestTypes);
-
-TYPED_TEST(TestRowGroupStatistics, MinMaxEncode) {
-  this->SetUpSchema(Repetition::REQUIRED);
-  this->TestMinMaxEncode();
-}
-
-TYPED_TEST(TestRowGroupStatistics, Reset) {
-  this->SetUpSchema(Repetition::OPTIONAL);
-  this->TestReset();
-}
-
-TYPED_TEST(TestRowGroupStatistics, FullRoundtrip) {
-  this->SetUpSchema(Repetition::OPTIONAL);
-  this->TestFullRoundtrip(100, 31);
-  this->TestFullRoundtrip(1000, 415);
-  this->TestFullRoundtrip(10000, 926);
-}
-
-template <typename TestType>
-class TestNumericRowGroupStatistics : public TestRowGroupStatistics<TestType> {};
-
-using NumericTypes = ::testing::Types<Int32Type, Int64Type, FloatType, DoubleType>;
-
-TYPED_TEST_CASE(TestNumericRowGroupStatistics, NumericTypes);
-
-TYPED_TEST(TestNumericRowGroupStatistics, Merge) {
-  this->SetUpSchema(Repetition::OPTIONAL);
-  this->TestMerge();
-}
-
-TEST(CorruptStatistics, Basics) {
-  ApplicationVersion version("parquet-mr version 1.8.0");
-  SchemaDescriptor schema;
-  schema::NodePtr node;
-  std::vector<schema::NodePtr> fields;
-  // Test Physical Types
-  fields.push_back(schema::PrimitiveNode::Make(
-      "col1", Repetition::OPTIONAL, Type::INT32, LogicalType::NONE));
-  fields.push_back(schema::PrimitiveNode::Make(
-      "col2", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::NONE));
-  // Test Logical Types
-  fields.push_back(schema::PrimitiveNode::Make(
-      "col3", Repetition::OPTIONAL, Type::INT32, LogicalType::DATE));
-  fields.push_back(schema::PrimitiveNode::Make(
-      "col4", Repetition::OPTIONAL, Type::INT32, LogicalType::UINT_32));
-  fields.push_back(schema::PrimitiveNode::Make("col5", Repetition::OPTIONAL,
-      Type::FIXED_LEN_BYTE_ARRAY, LogicalType::INTERVAL, 12));
-  fields.push_back(schema::PrimitiveNode::Make(
-      "col6", Repetition::OPTIONAL, Type::BYTE_ARRAY, LogicalType::UTF8));
-  node = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
-  schema.Init(node);
-
-  format::ColumnChunk col_chunk;
-  col_chunk.meta_data.__isset.statistics = true;
-  auto column_chunk1 = ColumnChunkMetaData::Make(
-      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(0), &version);
-  ASSERT_TRUE(column_chunk1->is_stats_set());
-  auto column_chunk2 = ColumnChunkMetaData::Make(
-      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(1), &version);
-  ASSERT_FALSE(column_chunk2->is_stats_set());
-  auto column_chunk3 = ColumnChunkMetaData::Make(
-      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(2), &version);
-  ASSERT_TRUE(column_chunk3->is_stats_set());
-  auto column_chunk4 = ColumnChunkMetaData::Make(
-      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(3), &version);
-  ASSERT_FALSE(column_chunk4->is_stats_set());
-  auto column_chunk5 = ColumnChunkMetaData::Make(
-      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(4), &version);
-  ASSERT_FALSE(column_chunk5->is_stats_set());
-  auto column_chunk6 = ColumnChunkMetaData::Make(
-      reinterpret_cast<const uint8_t*>(&col_chunk), schema.Column(5), &version);
-  ASSERT_FALSE(column_chunk6->is_stats_set());
-}
-
-}  // namespace test
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/statistics.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc
deleted file mode 100644
index 961a2af..0000000
--- a/src/parquet/column/statistics.cc
+++ /dev/null
@@ -1,244 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <algorithm>
-#include <cstring>
-
-#include "parquet/column/statistics.h"
-#include "parquet/encoding-internal.h"
-#include "parquet/exception.h"
-#include "parquet/util/comparison.h"
-#include "parquet/util/memory.h"
-
-using arrow::default_memory_pool;
-using arrow::MemoryPool;
-
-namespace parquet {
-
-template <typename DType>
-TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(
-    const ColumnDescriptor* schema, MemoryPool* pool)
-    : pool_(pool),
-      min_buffer_(AllocateBuffer(pool_, 0)),
-      max_buffer_(AllocateBuffer(pool_, 0)) {
-  SetDescr(schema);
-  Reset();
-}
-
-template <typename DType>
-TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_type& min,
-    const typename DType::c_type& max, int64_t num_values, int64_t null_count,
-    int64_t distinct_count)
-    : pool_(default_memory_pool()),
-      min_buffer_(AllocateBuffer(pool_, 0)),
-      max_buffer_(AllocateBuffer(pool_, 0)) {
-  IncrementNumValues(num_values);
-  IncrementNullCount(null_count);
-  IncrementDistinctCount(distinct_count);
-
-  Copy(min, &min_, min_buffer_.get());
-  Copy(max, &max_, max_buffer_.get());
-  has_min_max_ = true;
-}
-
-template <typename DType>
-TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor* schema,
-    const std::string& encoded_min, const std::string& encoded_max, int64_t num_values,
-    int64_t null_count, int64_t distinct_count, bool has_min_max, MemoryPool* pool)
-    : pool_(pool),
-      min_buffer_(AllocateBuffer(pool_, 0)),
-      max_buffer_(AllocateBuffer(pool_, 0)) {
-  IncrementNumValues(num_values);
-  IncrementNullCount(null_count);
-  IncrementDistinctCount(distinct_count);
-
-  SetDescr(schema);
-
-  if (!encoded_min.empty()) { PlainDecode(encoded_min, &min_); }
-  if (!encoded_max.empty()) { PlainDecode(encoded_max, &max_); }
-  has_min_max_ = has_min_max;
-}
-
-template <typename DType>
-bool TypedRowGroupStatistics<DType>::HasMinMax() const {
-  return has_min_max_;
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::Reset() {
-  ResetCounts();
-  has_min_max_ = false;
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::Update(
-    const T* values, int64_t num_not_null, int64_t num_null) {
-  DCHECK(num_not_null >= 0);
-  DCHECK(num_null >= 0);
-
-  IncrementNullCount(num_null);
-  IncrementNumValues(num_not_null);
-  // TODO: support distinct count?
-  if (num_not_null == 0) return;
-
-  Compare<T> compare(descr_);
-  auto batch_minmax = std::minmax_element(values, values + num_not_null, compare);
-  if (!has_min_max_) {
-    has_min_max_ = true;
-    Copy(*batch_minmax.first, &min_, min_buffer_.get());
-    Copy(*batch_minmax.second, &max_, max_buffer_.get());
-  } else {
-    Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get());
-    Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get());
-  }
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
-    const uint8_t* valid_bits, int64_t valid_bits_offset, int64_t num_not_null,
-    int64_t num_null) {
-  DCHECK(num_not_null >= 0);
-  DCHECK(num_null >= 0);
-
-  IncrementNullCount(num_null);
-  IncrementNumValues(num_not_null);
-  // TODO: support distinct count?
-  if (num_not_null == 0) return;
-
-  Compare<T> compare(descr_);
-  INIT_BITSET(valid_bits, static_cast<int>(valid_bits_offset));
-  // Find first valid entry and use that for min/max
-  // As (num_not_null != 0) there must be one
-  int64_t length = num_null + num_not_null;
-  int64_t i = 0;
-  for (; i < length; i++) {
-    if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { break; }
-    READ_NEXT_BITSET(valid_bits);
-  }
-  T min = values[i];
-  T max = values[i];
-  for (; i < length; i++) {
-    if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
-      if (compare(values[i], min)) {
-        min = values[i];
-      } else if (compare(max, values[i])) {
-        max = values[i];
-      }
-    }
-    READ_NEXT_BITSET(valid_bits);
-  }
-  if (!has_min_max_) {
-    has_min_max_ = true;
-    Copy(min, &min_, min_buffer_.get());
-    Copy(max, &max_, max_buffer_.get());
-  } else {
-    Copy(std::min(min_, min, compare), &min_, min_buffer_.get());
-    Copy(std::max(max_, max, compare), &max_, max_buffer_.get());
-  }
-}
-
-template <typename DType>
-const typename DType::c_type& TypedRowGroupStatistics<DType>::min() const {
-  return min_;
-}
-
-template <typename DType>
-const typename DType::c_type& TypedRowGroupStatistics<DType>::max() const {
-  return max_;
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>& other) {
-  this->MergeCounts(other);
-
-  if (!other.HasMinMax()) return;
-
-  if (!has_min_max_) {
-    Copy(other.min_, &this->min_, min_buffer_.get());
-    Copy(other.max_, &this->max_, max_buffer_.get());
-    has_min_max_ = true;
-    return;
-  }
-
-  Compare<T> compare(descr_);
-  Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get());
-  Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get());
-}
-
-template <typename DType>
-std::string TypedRowGroupStatistics<DType>::EncodeMin() {
-  std::string s;
-  if (HasMinMax()) this->PlainEncode(min_, &s);
-  return s;
-}
-
-template <typename DType>
-std::string TypedRowGroupStatistics<DType>::EncodeMax() {
-  std::string s;
-  if (HasMinMax()) this->PlainEncode(max_, &s);
-  return s;
-}
-
-template <typename DType>
-EncodedStatistics TypedRowGroupStatistics<DType>::Encode() {
-  EncodedStatistics s;
-  if (HasMinMax()) {
-    s.set_min(this->EncodeMin());
-    s.set_max(this->EncodeMax());
-  }
-  s.set_null_count(this->null_count());
-  return s;
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::PlainEncode(const T& src, std::string* dst) {
-  PlainEncoder<DType> encoder(descr(), pool_);
-  encoder.Put(&src, 1);
-  auto buffer = encoder.FlushValues();
-  auto ptr = reinterpret_cast<const char*>(buffer->data());
-  dst->assign(ptr, buffer->size());
-}
-
-template <typename DType>
-void TypedRowGroupStatistics<DType>::PlainDecode(const std::string& src, T* dst) {
-  PlainDecoder<DType> decoder(descr());
-  decoder.SetData(
-      1, reinterpret_cast<const uint8_t*>(src.c_str()), static_cast<int>(src.size()));
-  decoder.Decode(dst, 1);
-}
-
-template <>
-void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst) {
-  dst->assign(reinterpret_cast<const char*>(src.ptr), src.len);
-}
-
-template <>
-void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst) {
-  dst->len = static_cast<uint32_t>(src.size());
-  dst->ptr = reinterpret_cast<const uint8_t*>(src.c_str());
-}
-
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<BooleanType>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int32Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int64Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<Int96Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FloatType>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<DoubleType>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<ByteArrayType>;
-template class PARQUET_TEMPLATE_EXPORT TypedRowGroupStatistics<FLBAType>;
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h
deleted file mode 100644
index c6a2487..0000000
--- a/src/parquet/column/statistics.h
+++ /dev/null
@@ -1,234 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_STATISTICS_H
-#define PARQUET_COLUMN_STATISTICS_H
-
-#include <cstdint>
-#include <memory>
-#include <string>
-
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-class PARQUET_EXPORT EncodedStatistics {
-  std::shared_ptr<std::string> max_, min_;
-
- public:
-  EncodedStatistics()
-      : max_(std::make_shared<std::string>()), min_(std::make_shared<std::string>()) {}
-
-  const std::string& max() const { return *max_; }
-  const std::string& min() const { return *min_; }
-
-  int64_t null_count = 0;
-  int64_t distinct_count = 0;
-
-  bool has_min = false;
-  bool has_max = false;
-  bool has_null_count = false;
-  bool has_distinct_count = false;
-
-  inline bool is_set() const {
-    return has_min || has_max || has_null_count || has_distinct_count;
-  }
-
-  inline EncodedStatistics& set_max(const std::string& value) {
-    *max_ = value;
-    has_max = true;
-    return *this;
-  }
-
-  inline EncodedStatistics& set_min(const std::string& value) {
-    *min_ = value;
-    has_min = true;
-    return *this;
-  }
-
-  inline EncodedStatistics& set_null_count(int64_t value) {
-    null_count = value;
-    has_null_count = true;
-    return *this;
-  }
-
-  inline EncodedStatistics& set_distinct_count(int64_t value) {
-    distinct_count = value;
-    has_distinct_count = true;
-    return *this;
-  }
-};
-
-template <typename DType>
-class PARQUET_EXPORT TypedRowGroupStatistics;
-
-class PARQUET_EXPORT RowGroupStatistics
-    : public std::enable_shared_from_this<RowGroupStatistics> {
- public:
-  int64_t null_count() const { return statistics_.null_count; }
-  int64_t distinct_count() const { return statistics_.distinct_count; }
-  int64_t num_values() const { return num_values_; }
-
-  virtual bool HasMinMax() const = 0;
-  virtual void Reset() = 0;
-
-  // Plain-encoded minimum value
-  virtual std::string EncodeMin() = 0;
-
-  // Plain-encoded maximum value
-  virtual std::string EncodeMax() = 0;
-
-  virtual EncodedStatistics Encode() = 0;
-
-  virtual ~RowGroupStatistics() {}
-
-  Type::type physical_type() const { return descr_->physical_type(); }
-
- protected:
-  const ColumnDescriptor* descr() const { return descr_; }
-  void SetDescr(const ColumnDescriptor* schema) { descr_ = schema; }
-
-  void IncrementNullCount(int64_t n) { statistics_.null_count += n; }
-
-  void IncrementNumValues(int64_t n) { num_values_ += n; }
-
-  void IncrementDistinctCount(int64_t n) { statistics_.distinct_count += n; }
-
-  void MergeCounts(const RowGroupStatistics& other) {
-    this->statistics_.null_count += other.statistics_.null_count;
-    this->statistics_.distinct_count += other.statistics_.distinct_count;
-    this->num_values_ += other.num_values_;
-  }
-
-  void ResetCounts() {
-    this->statistics_.null_count = 0;
-    this->statistics_.distinct_count = 0;
-    this->num_values_ = 0;
-  }
-
-  const ColumnDescriptor* descr_ = nullptr;
-  int64_t num_values_ = 0;
-  EncodedStatistics statistics_;
-};
-
-template <typename DType>
-class TypedRowGroupStatistics : public RowGroupStatistics {
- public:
-  using T = typename DType::c_type;
-
-  TypedRowGroupStatistics(const ColumnDescriptor* schema,
-      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
-  TypedRowGroupStatistics(const T& min, const T& max, int64_t num_values,
-      int64_t null_count, int64_t distinct_count);
-
-  TypedRowGroupStatistics(const ColumnDescriptor* schema, const std::string& encoded_min,
-      const std::string& encoded_max, int64_t num_values, int64_t null_count,
-      int64_t distinct_count, bool has_min_max,
-      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
-
-  bool HasMinMax() const override;
-  void Reset() override;
-  void Merge(const TypedRowGroupStatistics<DType>& other);
-
-  void Update(const T* values, int64_t num_not_null, int64_t num_null);
-  void UpdateSpaced(const T* values, const uint8_t* valid_bits, int64_t valid_bits_spaced,
-      int64_t num_not_null, int64_t num_null);
-
-  const T& min() const;
-  const T& max() const;
-
-  std::string EncodeMin() override;
-  std::string EncodeMax() override;
-  EncodedStatistics Encode() override;
-
- private:
-  bool has_min_max_ = false;
-  T min_;
-  T max_;
-  ::arrow::MemoryPool* pool_;
-
-  void PlainEncode(const T& src, std::string* dst);
-  void PlainDecode(const std::string& src, T* dst);
-  void Copy(const T& src, T* dst, PoolBuffer* buffer);
-
-  std::shared_ptr<PoolBuffer> min_buffer_, max_buffer_;
-};
-
-template <typename DType>
-inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, PoolBuffer*) {
-  *dst = src;
-}
-
-template <>
-inline void TypedRowGroupStatistics<FLBAType>::Copy(
-    const FLBA& src, FLBA* dst, PoolBuffer* buffer) {
-  if (dst->ptr == src.ptr) return;
-  uint32_t len = descr_->type_length();
-  PARQUET_THROW_NOT_OK(buffer->Resize(len, false));
-  std::memcpy(buffer->mutable_data(), src.ptr, len);
-  *dst = FLBA(buffer->data());
-}
-
-template <>
-inline void TypedRowGroupStatistics<ByteArrayType>::Copy(
-    const ByteArray& src, ByteArray* dst, PoolBuffer* buffer) {
-  if (dst->ptr == src.ptr) return;
-  PARQUET_THROW_NOT_OK(buffer->Resize(src.len, false));
-  std::memcpy(buffer->mutable_data(), src.ptr, src.len);
-  *dst = ByteArray(src.len, buffer->data());
-}
-
-template <>
-void TypedRowGroupStatistics<ByteArrayType>::PlainEncode(const T& src, std::string* dst);
-
-template <>
-void TypedRowGroupStatistics<ByteArrayType>::PlainDecode(const std::string& src, T* dst);
-
-typedef TypedRowGroupStatistics<BooleanType> BoolStatistics;
-typedef TypedRowGroupStatistics<Int32Type> Int32Statistics;
-typedef TypedRowGroupStatistics<Int64Type> Int64Statistics;
-typedef TypedRowGroupStatistics<Int96Type> Int96Statistics;
-typedef TypedRowGroupStatistics<FloatType> FloatStatistics;
-typedef TypedRowGroupStatistics<DoubleType> DoubleStatistics;
-typedef TypedRowGroupStatistics<ByteArrayType> ByteArrayStatistics;
-typedef TypedRowGroupStatistics<FLBAType> FLBAStatistics;
-
-#if defined(__GNUC__) && !defined(__clang__)
-#pragma GCC diagnostic push
-#pragma GCC diagnostic ignored "-Wattributes"
-#endif
-
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<BooleanType>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int32Type>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int64Type>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<Int96Type>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<FloatType>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<DoubleType>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<ByteArrayType>;
-PARQUET_EXTERN_TEMPLATE TypedRowGroupStatistics<FLBAType>;
-
-#if defined(__GNUC__) && !defined(__clang__)
-#pragma GCC diagnostic pop
-#endif
-
-}  // namespace parquet
-
-#endif  // PARQUET_COLUMN_STATISTICS_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/test-specialization.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-specialization.h b/src/parquet/column/test-specialization.h
deleted file mode 100644
index 07767c0..0000000
--- a/src/parquet/column/test-specialization.h
+++ /dev/null
@@ -1,172 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// This module defines an abstract interface for iterating through pages in a
-// Parquet column chunk within a row group. It could be extended in the future
-// to iterate through all data pages in all chunks in a file.
-
-#ifndef PARQUET_COLUMN_TEST_SPECIALIZATION_H
-#define PARQUET_COLUMN_TEST_SPECIALIZATION_H
-
-#include <algorithm>
-#include <limits>
-#include <sstream>
-#include <string>
-#include <vector>
-
-#include "parquet/column/test-util.h"
-
-namespace parquet {
-
-namespace test {
-
-template <>
-void InitValues<bool>(int num_values, vector<bool>& values, vector<uint8_t>& buffer) {
-  values = flip_coins(num_values, 0);
-}
-
-template <>
-void InitValues<ByteArray>(
-    int num_values, vector<ByteArray>& values, vector<uint8_t>& buffer) {
-  int max_byte_array_len = 12;
-  int num_bytes = max_byte_array_len + sizeof(uint32_t);
-  size_t nbytes = num_values * num_bytes;
-  buffer.resize(nbytes);
-  random_byte_array(num_values, 0, buffer.data(), values.data(), max_byte_array_len);
-}
-
-template <>
-void InitValues<FLBA>(int num_values, vector<FLBA>& values, vector<uint8_t>& buffer) {
-  size_t nbytes = num_values * FLBA_LENGTH;
-  buffer.resize(nbytes);
-  random_fixed_byte_array(num_values, 0, buffer.data(), FLBA_LENGTH, values.data());
-}
-
-template <>
-void InitValues<Int96>(int num_values, vector<Int96>& values, vector<uint8_t>& buffer) {
-  random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(),
-      std::numeric_limits<int32_t>::max(), values.data());
-}
-
-inline std::string TestColumnName(int i) {
-  std::stringstream col_name;
-  col_name << "column_" << i;
-  return col_name.str();
-}
-
-// This class lives here because of its dependency on the InitValues specializations.
-template <typename TestType>
-class PrimitiveTypedTest : public ::testing::Test {
- public:
-  typedef typename TestType::c_type T;
-
-  void SetUpSchema(Repetition::type repetition, int num_columns = 1) {
-    std::vector<schema::NodePtr> fields;
-
-    for (int i = 0; i < num_columns; ++i) {
-      std::string name = TestColumnName(i);
-      fields.push_back(schema::PrimitiveNode::Make(
-          name, repetition, TestType::type_num, LogicalType::NONE, FLBA_LENGTH));
-    }
-    node_ = schema::GroupNode::Make("schema", Repetition::REQUIRED, fields);
-    schema_.Init(node_);
-  }
-
-  void GenerateData(int64_t num_values);
-  void SetupValuesOut(int64_t num_values);
-  void SyncValuesOut();
-
- protected:
-  schema::NodePtr node_;
-  SchemaDescriptor schema_;
-
-  // Input buffers
-  std::vector<T> values_;
-
-  std::vector<int16_t> def_levels_;
-
-  std::vector<uint8_t> buffer_;
-  // Pointer to the values, needed as we cannot use vector<bool>::data()
-  T* values_ptr_;
-  std::vector<uint8_t> bool_buffer_;
-
-  // Output buffers
-  std::vector<T> values_out_;
-  std::vector<uint8_t> bool_buffer_out_;
-  T* values_out_ptr_;
-};
-
-template <typename TestType>
-void PrimitiveTypedTest<TestType>::SyncValuesOut() {}
-
-template <>
-void PrimitiveTypedTest<BooleanType>::SyncValuesOut() {
-    std::vector<uint8_t>::const_iterator source_iterator = bool_buffer_out_.begin();
-    std::vector<T>::iterator destination_iterator = values_out_.begin();
-    while (source_iterator != bool_buffer_out_.end()) {
-        *destination_iterator++ = *source_iterator++ != 0;
-    }
-}
-
-template <typename TestType>
-void PrimitiveTypedTest<TestType>::SetupValuesOut(int64_t num_values) {
-  values_out_.clear();
-  values_out_.resize(num_values);
-  values_out_ptr_ = values_out_.data();
-}
-
-template <>
-void PrimitiveTypedTest<BooleanType>::SetupValuesOut(int64_t num_values) {
-  values_out_.clear();
-  values_out_.resize(num_values);
-
-  bool_buffer_out_.clear();
-  bool_buffer_out_.resize(num_values);
-  // Write once to all values so we can copy it without getting Valgrind errors
-  // about uninitialised values.
-  std::fill(bool_buffer_out_.begin(), bool_buffer_out_.end(), true);
-  values_out_ptr_ = reinterpret_cast<bool*>(bool_buffer_out_.data());
-}
-
-template <typename TestType>
-void PrimitiveTypedTest<TestType>::GenerateData(int64_t num_values) {
-  def_levels_.resize(num_values);
-  values_.resize(num_values);
-
-  InitValues<T>(static_cast<int>(num_values), values_, buffer_);
-  values_ptr_ = values_.data();
-
-  std::fill(def_levels_.begin(), def_levels_.end(), 1);
-}
-
-template <>
-void PrimitiveTypedTest<BooleanType>::GenerateData(int64_t num_values) {
-  def_levels_.resize(num_values);
-  values_.resize(num_values);
-
-  InitValues<T>(static_cast<int>(num_values), values_, buffer_);
-  bool_buffer_.resize(num_values);
-  std::copy(values_.begin(), values_.end(), bool_buffer_.begin());
-  values_ptr_ = reinterpret_cast<bool*>(bool_buffer_.data());
-
-  std::fill(def_levels_.begin(), def_levels_.end(), 1);
-}
-}  // namespace test
-
-}  // namespace parquet
-
-#endif  // PARQUET_COLUMN_TEST_SPECIALIZATION_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
deleted file mode 100644
index c133734..0000000
--- a/src/parquet/column/test-util.h
+++ /dev/null
@@ -1,429 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// This module defines an abstract interface for iterating through pages in a
-// Parquet column chunk within a row group. It could be extended in the future
-// to iterate through all data pages in all chunks in a file.
-
-#ifndef PARQUET_COLUMN_TEST_UTIL_H
-#define PARQUET_COLUMN_TEST_UTIL_H
-
-#include <algorithm>
-#include <limits>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include <gtest/gtest.h>
-
-#include "parquet/column/levels.h"
-#include "parquet/column/page.h"
-#include "parquet/encoding-internal.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/test-common.h"
-
-using std::vector;
-using std::shared_ptr;
-
-namespace parquet {
-
-static int FLBA_LENGTH = 12;
-
-bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
-  return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
-}
-
-namespace test {
-
-template <typename T>
-static void InitValues(int num_values, vector<T>& values, vector<uint8_t>& buffer) {
-  random_numbers(num_values, 0, std::numeric_limits<T>::min(),
-      std::numeric_limits<T>::max(), values.data());
-}
-
-template <typename T>
-static void InitDictValues(
-    int num_values, int num_dicts, vector<T>& values, vector<uint8_t>& buffer) {
-  int repeat_factor = num_values / num_dicts;
-  InitValues<T>(num_dicts, values, buffer);
-  // add some repeated values
-  for (int j = 1; j < repeat_factor; ++j) {
-    for (int i = 0; i < num_dicts; ++i) {
-      std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T));
-    }
-  }
-  // computed only dict_per_page * repeat_factor - 1 values < num_values
-  // compute remaining
-  for (int i = num_dicts * repeat_factor; i < num_values; ++i) {
-    std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T));
-  }
-}
-
-class MockPageReader : public PageReader {
- public:
-  explicit MockPageReader(const vector<shared_ptr<Page>>& pages)
-      : pages_(pages), page_index_(0) {}
-
-  // Implement the PageReader interface
-  virtual shared_ptr<Page> NextPage() {
-    if (page_index_ == static_cast<int>(pages_.size())) {
-      // EOS to consumer
-      return shared_ptr<Page>(nullptr);
-    }
-    return pages_[page_index_++];
-  }
-
- private:
-  vector<shared_ptr<Page>> pages_;
-  int page_index_;
-};
-
-// TODO(wesm): this is only used for testing for now. Refactor to form part of
-// primary file write path
-template <typename Type>
-class DataPageBuilder {
- public:
-  typedef typename Type::c_type T;
-
-  // This class writes data and metadata to the passed inputs
-  explicit DataPageBuilder(InMemoryOutputStream* sink)
-      : sink_(sink),
-        num_values_(0),
-        encoding_(Encoding::PLAIN),
-        definition_level_encoding_(Encoding::RLE),
-        repetition_level_encoding_(Encoding::RLE),
-        have_def_levels_(false),
-        have_rep_levels_(false),
-        have_values_(false) {}
-
-  void AppendDefLevels(const vector<int16_t>& levels, int16_t max_level,
-      Encoding::type encoding = Encoding::RLE) {
-    AppendLevels(levels, max_level, encoding);
-
-    num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
-    definition_level_encoding_ = encoding;
-    have_def_levels_ = true;
-  }
-
-  void AppendRepLevels(const vector<int16_t>& levels, int16_t max_level,
-      Encoding::type encoding = Encoding::RLE) {
-    AppendLevels(levels, max_level, encoding);
-
-    num_values_ = std::max(static_cast<int32_t>(levels.size()), num_values_);
-    repetition_level_encoding_ = encoding;
-    have_rep_levels_ = true;
-  }
-
-  void AppendValues(const ColumnDescriptor* d, const vector<T>& values,
-      Encoding::type encoding = Encoding::PLAIN) {
-    PlainEncoder<Type> encoder(d);
-    encoder.Put(&values[0], static_cast<int>(values.size()));
-    std::shared_ptr<Buffer> values_sink = encoder.FlushValues();
-    sink_->Write(values_sink->data(), values_sink->size());
-
-    num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
-    encoding_ = encoding;
-    have_values_ = true;
-  }
-
-  int32_t num_values() const { return num_values_; }
-
-  Encoding::type encoding() const { return encoding_; }
-
-  Encoding::type rep_level_encoding() const { return repetition_level_encoding_; }
-
-  Encoding::type def_level_encoding() const { return definition_level_encoding_; }
-
- private:
-  InMemoryOutputStream* sink_;
-
-  int32_t num_values_;
-  Encoding::type encoding_;
-  Encoding::type definition_level_encoding_;
-  Encoding::type repetition_level_encoding_;
-
-  bool have_def_levels_;
-  bool have_rep_levels_;
-  bool have_values_;
-
-  // Used internally for both repetition and definition levels
-  void AppendLevels(
-      const vector<int16_t>& levels, int16_t max_level, Encoding::type encoding) {
-    if (encoding != Encoding::RLE) {
-      ParquetException::NYI("only rle encoding currently implemented");
-    }
-
-    // TODO: compute a more precise maximum size for the encoded levels
-    vector<uint8_t> encode_buffer(levels.size() * 2);
-
-    // We encode into separate memory from the output stream because the
-    // RLE-encoded bytes have to be preceded in the stream by their absolute
-    // size.
-    LevelEncoder encoder;
-    encoder.Init(encoding, max_level, static_cast<int>(levels.size()),
-        encode_buffer.data(), static_cast<int>(encode_buffer.size()));
-
-    encoder.Encode(static_cast<int>(levels.size()), levels.data());
-
-    int32_t rle_bytes = encoder.len();
-    sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(int32_t));
-    sink_->Write(encode_buffer.data(), rle_bytes);
-  }
-};
-
-template <>
-void DataPageBuilder<BooleanType>::AppendValues(
-    const ColumnDescriptor* d, const vector<bool>& values, Encoding::type encoding) {
-  if (encoding != Encoding::PLAIN) {
-    ParquetException::NYI("only plain encoding currently implemented");
-  }
-  PlainEncoder<BooleanType> encoder(d);
-  encoder.Put(values, static_cast<int>(values.size()));
-  std::shared_ptr<Buffer> buffer = encoder.FlushValues();
-  sink_->Write(buffer->data(), buffer->size());
-
-  num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
-  encoding_ = encoding;
-  have_values_ = true;
-}
-
-template <typename Type>
-static shared_ptr<DataPage> MakeDataPage(const ColumnDescriptor* d,
-    const vector<typename Type::c_type>& values, int num_vals, Encoding::type encoding,
-    const uint8_t* indices, int indices_size, const vector<int16_t>& def_levels,
-    int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level) {
-  int num_values = 0;
-
-  InMemoryOutputStream page_stream;
-  test::DataPageBuilder<Type> page_builder(&page_stream);
-
-  if (!rep_levels.empty()) { page_builder.AppendRepLevels(rep_levels, max_rep_level); }
-  if (!def_levels.empty()) { page_builder.AppendDefLevels(def_levels, max_def_level); }
-
-  if (encoding == Encoding::PLAIN) {
-    page_builder.AppendValues(d, values, encoding);
-    num_values = page_builder.num_values();
-  } else {  // DICTIONARY PAGES
-    page_stream.Write(indices, indices_size);
-    num_values = std::max(page_builder.num_values(), num_vals);
-  }
-
-  auto buffer = page_stream.GetBuffer();
-
-  return std::make_shared<DataPage>(buffer, num_values, encoding,
-      page_builder.def_level_encoding(), page_builder.rep_level_encoding());
-}
-
-template <typename TYPE>
-class DictionaryPageBuilder {
- public:
-  typedef typename TYPE::c_type TC;
-  static constexpr int TN = TYPE::type_num;
-
-  // This class writes data and metadata to the passed inputs
-  explicit DictionaryPageBuilder(const ColumnDescriptor* d)
-      : num_dict_values_(0), have_values_(false) {
-    encoder_.reset(new DictEncoder<TYPE>(d, &pool_));
-  }
-
-  ~DictionaryPageBuilder() { pool_.FreeAll(); }
-
-  shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
-    int num_values = static_cast<int>(values.size());
-    // Dictionary encoding
-    encoder_->Put(values.data(), num_values);
-    num_dict_values_ = encoder_->num_entries();
-    have_values_ = true;
-    return encoder_->FlushValues();
-  }
-
-  shared_ptr<Buffer> WriteDict() {
-    std::shared_ptr<PoolBuffer> dict_buffer =
-        AllocateBuffer(::arrow::default_memory_pool(), encoder_->dict_encoded_size());
-    encoder_->WriteDict(dict_buffer->mutable_data());
-    return dict_buffer;
-  }
-
-  int32_t num_values() const { return num_dict_values_; }
-
- private:
-  ChunkedAllocator pool_;
-  shared_ptr<DictEncoder<TYPE>> encoder_;
-  int32_t num_dict_values_;
-  bool have_values_;
-};
-
-template <>
-DictionaryPageBuilder<BooleanType>::DictionaryPageBuilder(const ColumnDescriptor* d) {
-  ParquetException::NYI("only plain encoding currently implemented for boolean");
-}
-
-template <>
-shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::WriteDict() {
-  ParquetException::NYI("only plain encoding currently implemented for boolean");
-  return nullptr;
-}
-
-template <>
-shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::AppendValues(
-    const vector<TC>& values) {
-  ParquetException::NYI("only plain encoding currently implemented for boolean");
-  return nullptr;
-}
-
-template <typename Type>
-static shared_ptr<DictionaryPage> MakeDictPage(const ColumnDescriptor* d,
-    const vector<typename Type::c_type>& values, const vector<int>& values_per_page,
-    Encoding::type encoding, vector<shared_ptr<Buffer>>& rle_indices) {
-  InMemoryOutputStream page_stream;
-  test::DictionaryPageBuilder<Type> page_builder(d);
-  int num_pages = static_cast<int>(values_per_page.size());
-  int value_start = 0;
-
-  for (int i = 0; i < num_pages; i++) {
-    rle_indices.push_back(page_builder.AppendValues(
-        slice(values, value_start, value_start + values_per_page[i])));
-    value_start += values_per_page[i];
-  }
-
-  auto buffer = page_builder.WriteDict();
-
-  return std::make_shared<DictionaryPage>(
-      buffer, page_builder.num_values(), Encoding::PLAIN);
-}
-
-// Given def/rep levels and values create multiple dict pages
-template <typename Type>
-static void PaginateDict(const ColumnDescriptor* d,
-    const vector<typename Type::c_type>& values, const vector<int16_t>& def_levels,
-    int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level,
-    int num_levels_per_page, const vector<int>& values_per_page,
-    vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::RLE_DICTIONARY) {
-  int num_pages = static_cast<int>(values_per_page.size());
-  vector<shared_ptr<Buffer>> rle_indices;
-  shared_ptr<DictionaryPage> dict_page =
-      MakeDictPage<Type>(d, values, values_per_page, encoding, rle_indices);
-  pages.push_back(dict_page);
-  int def_level_start = 0;
-  int def_level_end = 0;
-  int rep_level_start = 0;
-  int rep_level_end = 0;
-  for (int i = 0; i < num_pages; i++) {
-    if (max_def_level > 0) {
-      def_level_start = i * num_levels_per_page;
-      def_level_end = (i + 1) * num_levels_per_page;
-    }
-    if (max_rep_level > 0) {
-      rep_level_start = i * num_levels_per_page;
-      rep_level_end = (i + 1) * num_levels_per_page;
-    }
-    shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(d, {}, values_per_page[i],
-        encoding, rle_indices[i]->data(), static_cast<int>(rle_indices[i]->size()),
-        slice(def_levels, def_level_start, def_level_end), max_def_level,
-        slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
-    pages.push_back(data_page);
-  }
-}
-
-// Given def/rep levels and values create multiple plain pages
-template <typename Type>
-static void PaginatePlain(const ColumnDescriptor* d,
-    const vector<typename Type::c_type>& values, const vector<int16_t>& def_levels,
-    int16_t max_def_level, const vector<int16_t>& rep_levels, int16_t max_rep_level,
-    int num_levels_per_page, const vector<int>& values_per_page,
-    vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::PLAIN) {
-  int num_pages = static_cast<int>(values_per_page.size());
-  int def_level_start = 0;
-  int def_level_end = 0;
-  int rep_level_start = 0;
-  int rep_level_end = 0;
-  int value_start = 0;
-  for (int i = 0; i < num_pages; i++) {
-    if (max_def_level > 0) {
-      def_level_start = i * num_levels_per_page;
-      def_level_end = (i + 1) * num_levels_per_page;
-    }
-    if (max_rep_level > 0) {
-      rep_level_start = i * num_levels_per_page;
-      rep_level_end = (i + 1) * num_levels_per_page;
-    }
-    shared_ptr<DataPage> page = MakeDataPage<Type>(d,
-        slice(values, value_start, value_start + values_per_page[i]), values_per_page[i],
-        encoding, NULL, 0, slice(def_levels, def_level_start, def_level_end),
-        max_def_level, slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
-    pages.push_back(page);
-    value_start += values_per_page[i];
-  }
-}
-
-// Generates pages from randomly generated data
-template <typename Type>
-static int MakePages(const ColumnDescriptor* d, int num_pages, int levels_per_page,
-    vector<int16_t>& def_levels, vector<int16_t>& rep_levels,
-    vector<typename Type::c_type>& values, vector<uint8_t>& buffer,
-    vector<shared_ptr<Page>>& pages, Encoding::type encoding = Encoding::PLAIN) {
-  int num_levels = levels_per_page * num_pages;
-  int num_values = 0;
-  uint32_t seed = 0;
-  int16_t zero = 0;
-  int16_t max_def_level = d->max_definition_level();
-  int16_t max_rep_level = d->max_repetition_level();
-  vector<int> values_per_page(num_pages, levels_per_page);
-  // Create definition levels
-  if (max_def_level > 0) {
-    def_levels.resize(num_levels);
-    random_numbers(num_levels, seed, zero, max_def_level, def_levels.data());
-    for (int p = 0; p < num_pages; p++) {
-      int num_values_per_page = 0;
-      for (int i = 0; i < levels_per_page; i++) {
-        if (def_levels[i + p * levels_per_page] == max_def_level) {
-          num_values_per_page++;
-          num_values++;
-        }
-      }
-      values_per_page[p] = num_values_per_page;
-    }
-  } else {
-    num_values = num_levels;
-  }
-  // Create repitition levels
-  if (max_rep_level > 0) {
-    rep_levels.resize(num_levels);
-    random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data());
-  }
-  // Create values
-  values.resize(num_values);
-  if (encoding == Encoding::PLAIN) {
-    InitValues<typename Type::c_type>(num_values, values, buffer);
-    PaginatePlain<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
-        levels_per_page, values_per_page, pages);
-  } else if (encoding == Encoding::RLE_DICTIONARY ||
-             encoding == Encoding::PLAIN_DICTIONARY) {
-    // Calls InitValues and repeats the data
-    InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer);
-    PaginateDict<Type>(d, values, def_levels, max_def_level, rep_levels, max_rep_level,
-        levels_per_page, values_per_page, pages);
-  }
-
-  return num_values;
-}
-
-}  // namespace test
-
-}  // namespace parquet
-
-#endif  // PARQUET_COLUMN_TEST_UTIL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
deleted file mode 100644
index 59f9999..0000000
--- a/src/parquet/column/writer.cc
+++ /dev/null
@@ -1,528 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/column/writer.h"
-
-#include "parquet/column/properties.h"
-#include "parquet/column/statistics.h"
-#include "parquet/encoding-internal.h"
-#include "parquet/util/logging.h"
-#include "parquet/util/memory.h"
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// ColumnWriter
-
-std::shared_ptr<WriterProperties> default_writer_properties() {
-  static std::shared_ptr<WriterProperties> default_writer_properties =
-      WriterProperties::Builder().build();
-  return default_writer_properties;
-}
-
-ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
-    std::unique_ptr<PageWriter> pager, int64_t expected_rows, bool has_dictionary,
-    Encoding::type encoding, const WriterProperties* properties)
-    : metadata_(metadata),
-      descr_(metadata->descr()),
-      pager_(std::move(pager)),
-      expected_rows_(expected_rows),
-      has_dictionary_(has_dictionary),
-      encoding_(encoding),
-      properties_(properties),
-      allocator_(properties->memory_pool()),
-      pool_(properties->memory_pool()),
-      num_buffered_values_(0),
-      num_buffered_encoded_values_(0),
-      num_rows_(0),
-      total_bytes_written_(0),
-      closed_(false),
-      fallback_(false) {
-  definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
-  repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
-  definition_levels_rle_ =
-      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
-  repetition_levels_rle_ =
-      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
-  uncompressed_data_ =
-      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
-  if (pager_->has_compressor()) {
-    compressed_data_ =
-        std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
-  }
-}
-
-void ColumnWriter::InitSinks() {
-  definition_levels_sink_->Clear();
-  repetition_levels_sink_->Clear();
-}
-
-void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
-  DCHECK(!closed_);
-  definition_levels_sink_->Write(
-      reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
-}
-
-void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t* levels) {
-  DCHECK(!closed_);
-  repetition_levels_sink_->Write(
-      reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
-}
-
-// return the size of the encoded buffer
-int64_t ColumnWriter::RleEncodeLevels(
-    const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level) {
-  // TODO: This only works with due to some RLE specifics
-  int64_t rle_size = LevelEncoder::MaxBufferSize(Encoding::RLE, max_level,
-                         static_cast<int>(num_buffered_values_)) +
-                     sizeof(int32_t);
-
-  // Use Arrow::Buffer::shrink_to_fit = false
-  // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
-  PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
-
-  level_encoder_.Init(Encoding::RLE, max_level, static_cast<int>(num_buffered_values_),
-      dest_buffer->mutable_data() + sizeof(int32_t),
-      static_cast<int>(dest_buffer->size()) - sizeof(int32_t));
-  int encoded = level_encoder_.Encode(static_cast<int>(num_buffered_values_),
-      reinterpret_cast<const int16_t*>(src_buffer.data()));
-  DCHECK_EQ(encoded, num_buffered_values_);
-  reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
-  int64_t encoded_size = level_encoder_.len() + sizeof(int32_t);
-  return encoded_size;
-}
-
-void ColumnWriter::AddDataPage() {
-  int64_t definition_levels_rle_size = 0;
-  int64_t repetition_levels_rle_size = 0;
-
-  std::shared_ptr<Buffer> values = GetValuesBuffer();
-
-  if (descr_->max_definition_level() > 0) {
-    definition_levels_rle_size = RleEncodeLevels(definition_levels_sink_->GetBufferRef(),
-        definition_levels_rle_.get(), descr_->max_definition_level());
-  }
-
-  if (descr_->max_repetition_level() > 0) {
-    repetition_levels_rle_size = RleEncodeLevels(repetition_levels_sink_->GetBufferRef(),
-        repetition_levels_rle_.get(), descr_->max_repetition_level());
-  }
-
-  int64_t uncompressed_size =
-      definition_levels_rle_size + repetition_levels_rle_size + values->size();
-
-  // Use Arrow::Buffer::shrink_to_fit = false
-  // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
-  PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
-
-  // Concatenate data into a single buffer
-  uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data();
-  memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size);
-  uncompressed_ptr += repetition_levels_rle_size;
-  memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size);
-  uncompressed_ptr += definition_levels_rle_size;
-  memcpy(uncompressed_ptr, values->data(), values->size());
-
-  EncodedStatistics page_stats = GetPageStatistics();
-  ResetPageStatistics();
-
-  std::shared_ptr<Buffer> compressed_data;
-  if (pager_->has_compressor()) {
-    pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get());
-    compressed_data = compressed_data_;
-  } else {
-    compressed_data = uncompressed_data_;
-  }
-
-  // Write the page to OutputStream eagerly if there is no dictionary or
-  // if dictionary encoding has fallen back to PLAIN
-  if (has_dictionary_ && !fallback_) {  // Save pages until end of dictionary encoding
-    std::shared_ptr<Buffer> compressed_data_copy;
-    PARQUET_THROW_NOT_OK(compressed_data->Copy(
-        0, compressed_data->size(), allocator_, &compressed_data_copy));
-    CompressedDataPage page(compressed_data_copy,
-        static_cast<int32_t>(num_buffered_values_), encoding_, Encoding::RLE,
-        Encoding::RLE, uncompressed_size, page_stats);
-    data_pages_.push_back(std::move(page));
-  } else {  // Eagerly write pages
-    CompressedDataPage page(compressed_data, static_cast<int32_t>(num_buffered_values_),
-        encoding_, Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
-    WriteDataPage(page);
-  }
-
-  // Re-initialize the sinks for next Page.
-  InitSinks();
-  num_buffered_values_ = 0;
-  num_buffered_encoded_values_ = 0;
-}
-
-void ColumnWriter::WriteDataPage(const CompressedDataPage& page) {
-  total_bytes_written_ += pager_->WriteDataPage(page);
-}
-
-int64_t ColumnWriter::Close() {
-  if (!closed_) {
-    closed_ = true;
-    if (has_dictionary_ && !fallback_) { WriteDictionaryPage(); }
-
-    FlushBufferedDataPages();
-
-    EncodedStatistics chunk_statistics = GetChunkStatistics();
-    if (chunk_statistics.is_set()) metadata_->SetStatistics(chunk_statistics);
-    pager_->Close(has_dictionary_, fallback_);
-  }
-
-  if (num_rows_ != expected_rows_) {
-    std::stringstream ss;
-    ss << "Written rows: " << num_rows_ << " != expected rows: " << expected_rows_
-       << "in the current column chunk";
-    throw ParquetException(ss.str());
-  }
-
-  return total_bytes_written_;
-}
-
-void ColumnWriter::FlushBufferedDataPages() {
-  // Write all outstanding data to a new page
-  if (num_buffered_values_ > 0) { AddDataPage(); }
-  for (size_t i = 0; i < data_pages_.size(); i++) {
-    WriteDataPage(data_pages_[i]);
-  }
-  data_pages_.clear();
-}
-
-// ----------------------------------------------------------------------
-// TypedColumnWriter
-
-template <typename Type>
-TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
-    std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
-    const WriterProperties* properties)
-    : ColumnWriter(metadata, std::move(pager), expected_rows,
-          (encoding == Encoding::PLAIN_DICTIONARY ||
-              encoding == Encoding::RLE_DICTIONARY),
-          encoding, properties) {
-  switch (encoding) {
-    case Encoding::PLAIN:
-      current_encoder_.reset(new PlainEncoder<Type>(descr_, properties->memory_pool()));
-      break;
-    case Encoding::PLAIN_DICTIONARY:
-    case Encoding::RLE_DICTIONARY:
-      current_encoder_.reset(
-          new DictEncoder<Type>(descr_, &pool_, properties->memory_pool()));
-      break;
-    default:
-      ParquetException::NYI("Selected encoding is not supported");
-  }
-
-  if (properties->statistics_enabled(descr_->path())) {
-    page_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
-    chunk_statistics_ = std::unique_ptr<TypedStats>(new TypedStats(descr_, allocator_));
-  }
-}
-
-// Only one Dictionary Page is written.
-// Fallback to PLAIN if dictionary page limit is reached.
-template <typename Type>
-void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
-  auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
-  if (dict_encoder->dict_encoded_size() >= properties_->dictionary_pagesize_limit()) {
-    WriteDictionaryPage();
-    // Serialize the buffered Dictionary Indicies
-    FlushBufferedDataPages();
-    fallback_ = true;
-    // Only PLAIN encoding is supported for fallback in V1
-    current_encoder_.reset(new PlainEncoder<Type>(descr_, properties_->memory_pool()));
-    encoding_ = Encoding::PLAIN;
-  }
-}
-
-template <typename Type>
-void TypedColumnWriter<Type>::WriteDictionaryPage() {
-  auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
-  std::shared_ptr<PoolBuffer> buffer =
-      AllocateBuffer(properties_->memory_pool(), dict_encoder->dict_encoded_size());
-  dict_encoder->WriteDict(buffer->mutable_data());
-  // TODO Get rid of this deep call
-  dict_encoder->mem_pool()->FreeAll();
-
-  DictionaryPage page(
-      buffer, dict_encoder->num_entries(), properties_->dictionary_index_encoding());
-  total_bytes_written_ += pager_->WriteDictionaryPage(page);
-}
-
-template <typename Type>
-EncodedStatistics TypedColumnWriter<Type>::GetPageStatistics() {
-  EncodedStatistics result;
-  if (page_statistics_) result = page_statistics_->Encode();
-  return result;
-}
-
-template <typename Type>
-EncodedStatistics TypedColumnWriter<Type>::GetChunkStatistics() {
-  EncodedStatistics result;
-  if (chunk_statistics_) result = chunk_statistics_->Encode();
-  return result;
-}
-
-template <typename Type>
-void TypedColumnWriter<Type>::ResetPageStatistics() {
-  if (chunk_statistics_ != nullptr) {
-    chunk_statistics_->Merge(*page_statistics_);
-    page_statistics_->Reset();
-  }
-}
-
-// ----------------------------------------------------------------------
-// Dynamic column writer constructor
-
-std::shared_ptr<ColumnWriter> ColumnWriter::Make(ColumnChunkMetaDataBuilder* metadata,
-    std::unique_ptr<PageWriter> pager, int64_t expected_rows,
-    const WriterProperties* properties) {
-  const ColumnDescriptor* descr = metadata->descr();
-  Encoding::type encoding = properties->encoding(descr->path());
-  if (properties->dictionary_enabled(descr->path()) &&
-      descr->physical_type() != Type::BOOLEAN) {
-    encoding = properties->dictionary_page_encoding();
-  }
-  switch (descr->physical_type()) {
-    case Type::BOOLEAN:
-      return std::make_shared<BoolWriter>(
-          metadata, std::move(pager), expected_rows, encoding, properties);
-    case Type::INT32:
-      return std::make_shared<Int32Writer>(
-          metadata, std::move(pager), expected_rows, encoding, properties);
-    case Type::INT64:
-      return std::make_shared<Int64Writer>(
-          metadata, std::move(pager), expected_rows, encoding, properties);
-    case Type::INT96:
-      return std::make_shared<Int96Writer>(
-          metadata, std::move(pager), expected_rows, encoding, properties);
-    case Type::FLOAT:
-      return std::make_shared<FloatWriter>(
-          metadata, std::move(pager), expected_rows, encoding, properties);
-    case Type::DOUBLE:
-      return std::make_shared<DoubleWriter>(
-          metadata, std::move(pager), expected_rows, encoding, properties);
-    case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayWriter>(
-          metadata, std::move(pager), expected_rows, encoding, properties);
-    case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayWriter>(
-          metadata, std::move(pager), expected_rows, encoding, properties);
-    default:
-      ParquetException::NYI("type reader not implemented");
-  }
-  // Unreachable code, but supress compiler warning
-  return std::shared_ptr<ColumnWriter>(nullptr);
-}
-
-// ----------------------------------------------------------------------
-// Instantiate templated classes
-
-template <typename DType>
-inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
-    const int16_t* def_levels, const int16_t* rep_levels, const T* values) {
-  int64_t values_to_write = 0;
-  // If the field is required and non-repeated, there are no definition levels
-  if (descr_->max_definition_level() > 0) {
-    for (int64_t i = 0; i < num_values; ++i) {
-      if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
-    }
-
-    WriteDefinitionLevels(num_values, def_levels);
-  } else {
-    // Required field, write all values
-    values_to_write = num_values;
-  }
-
-  // Not present for non-repeated fields
-  if (descr_->max_repetition_level() > 0) {
-    // A row could include more than one value
-    // Count the occasions where we start a new row
-    for (int64_t i = 0; i < num_values; ++i) {
-      if (rep_levels[i] == 0) { num_rows_++; }
-    }
-
-    WriteRepetitionLevels(num_values, rep_levels);
-  } else {
-    // Each value is exactly one row
-    num_rows_ += static_cast<int>(num_values);
-  }
-
-  if (num_rows_ > expected_rows_) {
-    throw ParquetException("More rows were written in the column chunk than expected");
-  }
-
-  // PARQUET-780
-  if (values_to_write > 0) { DCHECK(nullptr != values) << "Values ptr cannot be NULL"; }
-
-  WriteValues(values_to_write, values);
-
-  if (page_statistics_ != nullptr) {
-    page_statistics_->Update(values, values_to_write, num_values - values_to_write);
-  }
-
-  num_buffered_values_ += num_values;
-  num_buffered_encoded_values_ += values_to_write;
-
-  if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
-    AddDataPage();
-  }
-  if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
-
-  return values_to_write;
-}
-
-template <typename DType>
-inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values,
-    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
-    int64_t valid_bits_offset, const T* values, int64_t* num_spaced_written) {
-  int64_t values_to_write = 0;
-  int64_t spaced_values_to_write = 0;
-  // If the field is required and non-repeated, there are no definition levels
-  if (descr_->max_definition_level() > 0) {
-    // Minimal definition level for which spaced values are written
-    int16_t min_spaced_def_level = descr_->max_definition_level();
-    if (descr_->schema_node()->is_optional()) { min_spaced_def_level--; }
-    for (int64_t i = 0; i < num_values; ++i) {
-      if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
-      if (def_levels[i] >= min_spaced_def_level) { ++spaced_values_to_write; }
-    }
-
-    WriteDefinitionLevels(num_values, def_levels);
-  } else {
-    // Required field, write all values
-    values_to_write = num_values;
-    spaced_values_to_write = num_values;
-  }
-
-  // Not present for non-repeated fields
-  if (descr_->max_repetition_level() > 0) {
-    // A row could include more than one value
-    // Count the occasions where we start a new row
-    for (int64_t i = 0; i < num_values; ++i) {
-      if (rep_levels[i] == 0) { num_rows_++; }
-    }
-
-    WriteRepetitionLevels(num_values, rep_levels);
-  } else {
-    // Each value is exactly one row
-    num_rows_ += static_cast<int>(num_values);
-  }
-
-  if (num_rows_ > expected_rows_) {
-    throw ParquetException("More rows were written in the column chunk than expected");
-  }
-
-  if (descr_->schema_node()->is_optional()) {
-    WriteValuesSpaced(spaced_values_to_write, valid_bits, valid_bits_offset, values);
-  } else {
-    WriteValues(values_to_write, values);
-  }
-  *num_spaced_written = spaced_values_to_write;
-
-  if (page_statistics_ != nullptr) {
-    page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write,
-        num_values - values_to_write);
-  }
-
-  num_buffered_values_ += num_values;
-  num_buffered_encoded_values_ += values_to_write;
-
-  if (current_encoder_->EstimatedDataEncodedSize() >= properties_->data_pagesize()) {
-    AddDataPage();
-  }
-  if (has_dictionary_ && !fallback_) { CheckDictionarySizeLimit(); }
-
-  return values_to_write;
-}
-
-template <typename DType>
-void TypedColumnWriter<DType>::WriteBatch(int64_t num_values, const int16_t* def_levels,
-    const int16_t* rep_levels, const T* values) {
-  // We check for DataPage limits only after we have inserted the values. If a user
-  // writes a large number of values, the DataPage size can be much above the limit.
-  // The purpose of this chunking is to bound this. Even if a user writes large number
-  // of values, the chunking will ensure the AddDataPage() is called at a reasonable
-  // pagesize limit
-  int64_t write_batch_size = properties_->write_batch_size();
-  int num_batches = static_cast<int>(num_values / write_batch_size);
-  int64_t num_remaining = num_values % write_batch_size;
-  int64_t value_offset = 0;
-  for (int round = 0; round < num_batches; round++) {
-    int64_t offset = round * write_batch_size;
-    int64_t num_values = WriteMiniBatch(write_batch_size, &def_levels[offset],
-        &rep_levels[offset], &values[value_offset]);
-    value_offset += num_values;
-  }
-  // Write the remaining values
-  int64_t offset = num_batches * write_batch_size;
-  WriteMiniBatch(
-      num_remaining, &def_levels[offset], &rep_levels[offset], &values[value_offset]);
-}
-
-template <typename DType>
-void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values,
-    const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
-    int64_t valid_bits_offset, const T* values) {
-  // We check for DataPage limits only after we have inserted the values. If a user
-  // writes a large number of values, the DataPage size can be much above the limit.
-  // The purpose of this chunking is to bound this. Even if a user writes large number
-  // of values, the chunking will ensure the AddDataPage() is called at a reasonable
-  // pagesize limit
-  int64_t write_batch_size = properties_->write_batch_size();
-  int num_batches = static_cast<int>(num_values / write_batch_size);
-  int64_t num_remaining = num_values % write_batch_size;
-  int64_t num_spaced_written = 0;
-  int64_t values_offset = 0;
-  for (int round = 0; round < num_batches; round++) {
-    int64_t offset = round * write_batch_size;
-    WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset],
-        valid_bits, valid_bits_offset + values_offset, values + values_offset,
-        &num_spaced_written);
-    values_offset += num_spaced_written;
-  }
-  // Write the remaining values
-  int64_t offset = num_batches * write_batch_size;
-  WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset],
-      valid_bits, valid_bits_offset + values_offset, values + values_offset,
-      &num_spaced_written);
-}
-
-template <typename DType>
-void TypedColumnWriter<DType>::WriteValues(int64_t num_values, const T* values) {
-  current_encoder_->Put(values, static_cast<int>(num_values));
-}
-
-template <typename DType>
-void TypedColumnWriter<DType>::WriteValuesSpaced(int64_t num_values,
-    const uint8_t* valid_bits, int64_t valid_bits_offset, const T* values) {
-  current_encoder_->PutSpaced(
-      values, static_cast<int>(num_values), valid_bits, valid_bits_offset);
-}
-
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<BooleanType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int32Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int64Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<Int96Type>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FloatType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<DoubleType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<ByteArrayType>;
-template class PARQUET_TEMPLATE_EXPORT TypedColumnWriter<FLBAType>;
-
-}  // namespace parquet


Mime
View raw message