parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [1/2] parquet-cpp git commit: PARQUET-1100: Introduce RecordReader interface to better support nested data, refactor parquet/arrow/reader
Date Wed, 20 Sep 2017 01:39:00 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 18ca3922e -> 4b09ac703


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/record_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/record_reader.cc b/src/parquet/arrow/record_reader.cc
new file mode 100644
index 0000000..7275d2f
--- /dev/null
+++ b/src/parquet/arrow/record_reader.cc
@@ -0,0 +1,807 @@
+// licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/arrow/record_reader.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+#include <sstream>
+
+#include <arrow/buffer.h>
+#include <arrow/memory_pool.h>
+#include <arrow/status.h>
+#include <arrow/util/bit-util.h>
+#include <arrow/util/rle-encoding.h>
+
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/encoding-internal.h"
+#include "parquet/exception.h"
+#include "parquet/properties.h"
+
+using arrow::MemoryPool;
+
+namespace parquet {
+namespace internal {
+
+namespace BitUtil = ::arrow::BitUtil;
+
+template <typename DType>
+class TypedRecordReader;
+
+// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
+// encoding.
+static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+  return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
+}
+
+class RecordReader::RecordReaderImpl {
+ public:
+  RecordReaderImpl(const ColumnDescriptor* descr, MemoryPool* pool)
+      : descr_(descr),
+        pool_(pool),
+        num_buffered_values_(0),
+        num_decoded_values_(0),
+        max_def_level_(descr->max_definition_level()),
+        max_rep_level_(descr->max_repetition_level()),
+        at_record_start_(false),
+        records_read_(0),
+        values_written_(0),
+        values_capacity_(0),
+        null_count_(0),
+        levels_written_(0),
+        levels_position_(0),
+        levels_capacity_(0) {
+    nullable_values_ = internal::HasSpacedValues(descr);
+    values_ = std::make_shared<PoolBuffer>(pool);
+    valid_bits_ = std::make_shared<PoolBuffer>(pool);
+    def_levels_ = std::make_shared<PoolBuffer>(pool);
+    rep_levels_ = std::make_shared<PoolBuffer>(pool);
+
+    if (descr->physical_type() == Type::BYTE_ARRAY) {
+      builder_.reset(new ::arrow::BinaryBuilder(pool));
+    } else if (descr->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
+      int byte_width = descr->type_length();
+      std::shared_ptr<::arrow::DataType> type = ::arrow::fixed_size_binary(byte_width);
+      builder_.reset(new ::arrow::FixedSizeBinaryBuilder(type, pool));
+    }
+    Reset();
+  }
+
+  virtual ~RecordReaderImpl() {}
+
+  virtual int64_t ReadRecords(int64_t num_records) = 0;
+
+  // Dictionary decoders must be reset when advancing row groups
+  virtual void ResetDecoders() = 0;
+
+  void SetPageReader(std::unique_ptr<PageReader> reader) {
+    pager_ = std::move(reader);
+    ResetDecoders();
+  }
+
+  bool HasMoreData() const { return pager_ != nullptr; }
+
+  int16_t* def_levels() const {
+    return reinterpret_cast<int16_t*>(def_levels_->mutable_data());
+  }
+
+  int16_t* rep_levels() {
+    return reinterpret_cast<int16_t*>(rep_levels_->mutable_data());
+  }
+
+  uint8_t* values() const { return values_->mutable_data(); }
+
+  /// \brief Number of values written including nulls (if any)
+  int64_t values_written() const { return values_written_; }
+
+  int64_t levels_position() const { return levels_position_; }
+  int64_t levels_written() const { return levels_written_; }
+
+  // We may outwardly have the appearance of having exhausted a column chunk
+  // when in fact we are in the middle of processing the last batch
+  bool has_values_to_process() const { return levels_position_ < levels_written_; }
+
+  int64_t null_count() const { return null_count_; }
+
+  bool nullable_values() const { return nullable_values_; }
+
+  std::shared_ptr<PoolBuffer> ReleaseValues() {
+    auto result = values_;
+    values_ = std::make_shared<PoolBuffer>(pool_);
+    return result;
+  }
+
+  std::shared_ptr<PoolBuffer> ReleaseIsValid() {
+    auto result = valid_bits_;
+    valid_bits_ = std::make_shared<PoolBuffer>(pool_);
+    return result;
+  }
+
+  ::arrow::ArrayBuilder* builder() { return builder_.get(); }
+
+  // Process written repetition/definition levels to reach the end of
+  // records. Process no more levels than necessary to delimit the indicated
+  // number of logical records. Updates internal state of RecordReader
+  //
+  // \return Number of records delimited
+  int64_t DelimitRecords(int64_t num_records, int64_t* values_seen) {
+    int64_t values_to_read = 0;
+    int64_t records_read = 0;
+
+    const int16_t* def_levels = this->def_levels() + levels_position_;
+    const int16_t* rep_levels = this->rep_levels() + levels_position_;
+
+    DCHECK_GT(max_rep_level_, 0);
+
+    // Count logical records and number of values to read
+    while (levels_position_ < levels_written_) {
+      if (*rep_levels++ == 0) {
+        at_record_start_ = true;
+        if (records_read == num_records) {
+          // We've found the number of records we were looking for
+          break;
+        } else {
+          // Continue
+          ++records_read;
+        }
+      } else {
+        at_record_start_ = false;
+      }
+      if (*def_levels++ == max_def_level_) {
+        ++values_to_read;
+      }
+      ++levels_position_;
+    }
+    *values_seen = values_to_read;
+    return records_read;
+  }
+
+  // Read multiple definition levels into preallocated memory
+  //
+  // Returns the number of decoded definition levels
+  int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
+    if (descr_->max_definition_level() == 0) {
+      return 0;
+    }
+    return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+  }
+
+  int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
+    if (descr_->max_repetition_level() == 0) {
+      return 0;
+    }
+    return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+  }
+
+  int64_t available_values_current_page() const {
+    return num_buffered_values_ - num_decoded_values_;
+  }
+
+  void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
+
+  Type::type type() const { return descr_->physical_type(); }
+
+  const ColumnDescriptor* descr() const { return descr_; }
+
+  void Reserve(int64_t capacity) {
+    ReserveLevels(capacity);
+    ReserveValues(capacity);
+  }
+
+  void ReserveLevels(int64_t capacity) {
+    if (descr_->max_definition_level() > 0 &&
+        (levels_written_ + capacity > levels_capacity_)) {
+      int64_t new_levels_capacity = BitUtil::NextPower2(levels_capacity_ + 1);
+      while (levels_written_ + capacity > new_levels_capacity) {
+        new_levels_capacity = BitUtil::NextPower2(new_levels_capacity + 1);
+      }
+      PARQUET_THROW_NOT_OK(
+          def_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
+      if (descr_->max_repetition_level() > 0) {
+        PARQUET_THROW_NOT_OK(
+            rep_levels_->Resize(new_levels_capacity * sizeof(int16_t), false));
+      }
+      levels_capacity_ = new_levels_capacity;
+    }
+  }
+
+  void ReserveValues(int64_t capacity) {
+    if (values_written_ + capacity > values_capacity_) {
+      int64_t new_values_capacity = BitUtil::NextPower2(values_capacity_ + 1);
+      while (values_written_ + capacity > new_values_capacity) {
+        new_values_capacity = BitUtil::NextPower2(new_values_capacity + 1);
+      }
+
+      int type_size = GetTypeByteSize(descr_->physical_type());
+      PARQUET_THROW_NOT_OK(values_->Resize(new_values_capacity * type_size, false));
+      values_capacity_ = new_values_capacity;
+    }
+    if (nullable_values_) {
+      int64_t valid_bytes_new = BitUtil::BytesForBits(values_capacity_);
+      if (valid_bits_->size() < valid_bytes_new) {
+        int64_t valid_bytes_old = BitUtil::BytesForBits(values_written_);
+        PARQUET_THROW_NOT_OK(valid_bits_->Resize(valid_bytes_new, false));
+
+        // Avoid valgrind warnings
+        memset(valid_bits_->mutable_data() + valid_bytes_old, 0,
+               valid_bytes_new - valid_bytes_old);
+      }
+    }
+  }
+
+  void Reset() {
+    ResetValues();
+
+    if (levels_written_ > 0) {
+      const int64_t levels_remaining = levels_written_ - levels_position_;
+      // Shift remaining levels to beginning of buffer and trim to only the number
+      // of decoded levels remaining
+      int16_t* def_data = def_levels();
+      int16_t* rep_data = rep_levels();
+
+      std::copy(def_data + levels_position_, def_data + levels_written_, def_data);
+      std::copy(rep_data + levels_position_, rep_data + levels_written_, rep_data);
+
+      PARQUET_THROW_NOT_OK(
+          def_levels_->Resize(levels_remaining * sizeof(int16_t), false));
+      PARQUET_THROW_NOT_OK(
+          rep_levels_->Resize(levels_remaining * sizeof(int16_t), false));
+
+      levels_written_ -= levels_position_;
+      levels_position_ = 0;
+      levels_capacity_ = levels_remaining;
+    }
+
+    records_read_ = 0;
+
+    // Calling Finish on the builders also resets them
+  }
+
+  void ResetValues() {
+    if (values_written_ > 0) {
+      // Resize to 0, but do not shrink to fit
+      PARQUET_THROW_NOT_OK(values_->Resize(0, false));
+      PARQUET_THROW_NOT_OK(valid_bits_->Resize(0, false));
+      values_written_ = 0;
+      values_capacity_ = 0;
+      null_count_ = 0;
+    }
+  }
+
+ protected:
+  const ColumnDescriptor* descr_;
+  ::arrow::MemoryPool* pool_;
+
+  std::unique_ptr<PageReader> pager_;
+  std::shared_ptr<Page> current_page_;
+
+  // Not set if full schema for this field has no optional or repeated elements
+  LevelDecoder definition_level_decoder_;
+
+  // Not set for flat schemas.
+  LevelDecoder repetition_level_decoder_;
+
+  // The total number of values stored in the data page. This is the maximum of
+  // the number of encoded definition levels or encoded values. For
+  // non-repeated, required columns, this is equal to the number of encoded
+  // values. For repeated or optional values, there may be fewer data values
+  // than levels, and this tells you how many encoded levels there are in that
+  // case.
+  int64_t num_buffered_values_;
+
+  // The number of values from the current data page that have been decoded
+  // into memory
+  int64_t num_decoded_values_;
+
+  const int max_def_level_;
+  const int max_rep_level_;
+
+  bool nullable_values_;
+
+  bool at_record_start_;
+  int64_t records_read_;
+
+  int64_t values_written_;
+  int64_t values_capacity_;
+  int64_t null_count_;
+
+  int64_t levels_written_;
+  int64_t levels_position_;
+  int64_t levels_capacity_;
+
+  // TODO(wesm): ByteArray / FixedLenByteArray types
+  std::unique_ptr<::arrow::ArrayBuilder> builder_;
+
+  std::shared_ptr<::arrow::PoolBuffer> values_;
+
+  template <typename T>
+  T* ValuesHead() {
+    return reinterpret_cast<T*>(values_->mutable_data()) + values_written_;
+  }
+
+  std::shared_ptr<::arrow::PoolBuffer> valid_bits_;
+  std::shared_ptr<::arrow::PoolBuffer> def_levels_;
+  std::shared_ptr<::arrow::PoolBuffer> rep_levels_;
+};
+
+// The minimum number of repetition/definition levels to decode at a time, for
+// better vectorized performance when doing many smaller record reads
+constexpr int64_t kMinLevelBatchSize = 1024;
+
+template <typename DType>
+class TypedRecordReader : public RecordReader::RecordReaderImpl {
+ public:
+  typedef typename DType::c_type T;
+
+  ~TypedRecordReader() {}
+
+  TypedRecordReader(const ColumnDescriptor* schema, ::arrow::MemoryPool* pool)
+      : RecordReader::RecordReaderImpl(schema, pool), current_decoder_(nullptr) {}
+
+  void ResetDecoders() override { decoders_.clear(); }
+
+  inline void ReadValuesSpaced(int64_t values_with_nulls, int64_t null_count) {
+    uint8_t* valid_bits = valid_bits_->mutable_data();
+    const int64_t valid_bits_offset = values_written_;
+
+    int64_t num_decoded = current_decoder_->DecodeSpaced(
+        ValuesHead<T>(), static_cast<int>(values_with_nulls),
+        static_cast<int>(null_count), valid_bits, valid_bits_offset);
+    DCHECK_EQ(num_decoded, values_with_nulls);
+  }
+
+  inline void ReadValuesDense(int64_t values_to_read) {
+    int64_t num_decoded =
+        current_decoder_->Decode(ValuesHead<T>(), static_cast<int>(values_to_read));
+    DCHECK_EQ(num_decoded, values_to_read);
+  }
+
+  // Return number of logical records read
+  int64_t ReadRecordData(const int64_t num_records) {
+    // Conservative upper bound
+    const int64_t possible_num_values =
+        std::max(num_records, levels_written_ - levels_position_);
+    ReserveValues(possible_num_values);
+
+    const int64_t start_levels_position = levels_position_;
+
+    int64_t values_to_read = 0;
+    int64_t records_read = 0;
+    if (max_rep_level_ > 0) {
+      records_read = DelimitRecords(num_records, &values_to_read);
+    } else if (max_def_level_ > 0) {
+      // No repetition levels, skip delimiting logic. Each level represents a
+      // null or not null entry
+      records_read = std::min(levels_written_ - levels_position_, num_records);
+
+      // This is advanced by DelimitRecords, which we skipped
+      levels_position_ += records_read;
+    } else {
+      records_read = values_to_read = num_records;
+    }
+
+    int64_t null_count = 0;
+    if (nullable_values_) {
+      int64_t values_with_nulls = 0;
+      internal::DefinitionLevelsToBitmap(
+          def_levels() + start_levels_position, levels_position_ - start_levels_position,
+          max_def_level_, max_rep_level_, &values_with_nulls, &null_count,
+          valid_bits_->mutable_data(), values_written_);
+      values_to_read = values_with_nulls - null_count;
+      ReadValuesSpaced(values_with_nulls, null_count);
+      ConsumeBufferedValues(levels_position_ - start_levels_position);
+    } else {
+      ReadValuesDense(values_to_read);
+      ConsumeBufferedValues(values_to_read);
+    }
+    // Total values, including null spaces, if any
+    values_written_ += values_to_read + null_count;
+    null_count_ += null_count;
+
+    return records_read;
+  }
+
+  // Returns true if there are still values in this column.
+  bool HasNext() {
+    // Either there is no data page available yet, or the data page has been
+    // exhausted
+    if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
+      if (!ReadNewPage() || num_buffered_values_ == 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  int64_t ReadRecords(int64_t num_records) override {
+    // Delimit records, then read values at the end
+    int64_t records_read = 0;
+
+    if (levels_position_ < levels_written_) {
+      records_read += ReadRecordData(num_records);
+    }
+
+    // HasNext invokes ReadNewPage
+    if (records_read == 0 && !HasNext()) {
+      return 0;
+    }
+
+    int64_t level_batch_size = std::max(kMinLevelBatchSize, num_records);
+
+    // If we are in the middle of a record, we continue until reaching the
+    // desired number of records or the end of the current record if we've found
+    // enough records
+    while (!at_record_start_ || records_read < num_records) {
+      // Is there more data to read in this row group?
+      if (!HasNext()) {
+        break;
+      }
+
+      /// We perform multiple batch reads until we either exhaust the row group
+      /// or observe the desired number of records
+      int64_t batch_size = std::min(level_batch_size, available_values_current_page());
+
+      // No more data in column
+      if (batch_size == 0) {
+        break;
+      }
+
+      if (max_def_level_ > 0) {
+        ReserveLevels(batch_size);
+
+        int16_t* def_levels = this->def_levels() + levels_written_;
+        int16_t* rep_levels = this->rep_levels() + levels_written_;
+
+        // Not present for non-repeated fields
+        int64_t levels_read = 0;
+        if (max_rep_level_ > 0) {
+          levels_read = ReadDefinitionLevels(batch_size, def_levels);
+          if (ReadRepetitionLevels(batch_size, rep_levels) != levels_read) {
+            throw ParquetException("Number of decoded rep / def levels did not match");
+          }
+        } else if (max_def_level_ > 0) {
+          levels_read = ReadDefinitionLevels(batch_size, def_levels);
+        }
+
+        // Exhausted column chunk
+        if (levels_read == 0) {
+          break;
+        }
+
+        levels_written_ += levels_read;
+        records_read += ReadRecordData(num_records - records_read);
+      } else {
+        // No repetition or definition levels
+        batch_size = std::min(num_records - records_read, batch_size);
+        records_read += ReadRecordData(batch_size);
+      }
+    }
+
+    return records_read;
+  }
+
+ private:
+  typedef Decoder<DType> DecoderType;
+
+  // Map of encoding type to the respective decoder object. For example, a
+  // column chunk's data pages may include both dictionary-encoded and
+  // plain-encoded data.
+  std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
+
+  DecoderType* current_decoder_;
+
+  // Advance to the next data page
+  bool ReadNewPage();
+
+  void ConfigureDictionary(const DictionaryPage* page);
+};
+
+template <>
+inline void TypedRecordReader<ByteArrayType>::ReadValuesDense(int64_t values_to_read) {
+  auto values = ValuesHead<ByteArray>();
+  int64_t num_decoded =
+      current_decoder_->Decode(values, static_cast<int>(values_to_read));
+  DCHECK_EQ(num_decoded, values_to_read);
+
+  auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
+  for (int64_t i = 0; i < num_decoded; i++) {
+    PARQUET_THROW_NOT_OK(
+        builder->Append(values[i].ptr, static_cast<int64_t>(values[i].len)));
+  }
+  ResetValues();
+}
+
+template <>
+inline void TypedRecordReader<FLBAType>::ReadValuesDense(int64_t values_to_read) {
+  auto values = ValuesHead<FLBA>();
+  int64_t num_decoded =
+      current_decoder_->Decode(values, static_cast<int>(values_to_read));
+  DCHECK_EQ(num_decoded, values_to_read);
+
+  auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
+  for (int64_t i = 0; i < num_decoded; i++) {
+    PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
+  }
+  ResetValues();
+}
+
+template <>
+inline void TypedRecordReader<ByteArrayType>::ReadValuesSpaced(int64_t values_to_read,
+                                                               int64_t null_count) {
+  uint8_t* valid_bits = valid_bits_->mutable_data();
+  const int64_t valid_bits_offset = values_written_;
+  auto values = ValuesHead<ByteArray>();
+
+  int64_t num_decoded = current_decoder_->DecodeSpaced(
+      values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
+      valid_bits_offset);
+  DCHECK_EQ(num_decoded, values_to_read);
+
+  auto builder = static_cast<::arrow::BinaryBuilder*>(builder_.get());
+
+  for (int64_t i = 0; i < num_decoded; i++) {
+    if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
+      PARQUET_THROW_NOT_OK(
+          builder->Append(values[i].ptr, static_cast<int64_t>(values[i].len)));
+    } else {
+      PARQUET_THROW_NOT_OK(builder->AppendNull());
+    }
+  }
+  ResetValues();
+}
+
+template <>
+inline void TypedRecordReader<FLBAType>::ReadValuesSpaced(int64_t values_to_read,
+                                                          int64_t null_count) {
+  uint8_t* valid_bits = valid_bits_->mutable_data();
+  const int64_t valid_bits_offset = values_written_;
+  auto values = ValuesHead<FLBA>();
+
+  int64_t num_decoded = current_decoder_->DecodeSpaced(
+      values, static_cast<int>(values_to_read), static_cast<int>(null_count), valid_bits,
+      valid_bits_offset);
+  DCHECK_EQ(num_decoded, values_to_read);
+
+  auto builder = static_cast<::arrow::FixedSizeBinaryBuilder*>(builder_.get());
+  for (int64_t i = 0; i < num_decoded; i++) {
+    if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
+      PARQUET_THROW_NOT_OK(builder->Append(values[i].ptr));
+    } else {
+      PARQUET_THROW_NOT_OK(builder->AppendNull());
+    }
+  }
+  ResetValues();
+}
+
+template <typename DType>
+inline void TypedRecordReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
+  int encoding = static_cast<int>(page->encoding());
+  if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+      page->encoding() == Encoding::PLAIN) {
+    encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
+  }
+
+  auto it = decoders_.find(encoding);
+  if (it != decoders_.end()) {
+    throw ParquetException("Column cannot have more than one dictionary.");
+  }
+
+  if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+      page->encoding() == Encoding::PLAIN) {
+    PlainDecoder<DType> dictionary(descr_);
+    dictionary.SetData(page->num_values(), page->data(), page->size());
+
+    // The dictionary is fully decoded during DictionaryDecoder::Init, so the
+    // DictionaryPage buffer is no longer required after this step
+    //
+    // TODO(wesm): investigate whether this all-or-nothing decoding of the
+    // dictionary makes sense and whether performance can be improved
+
+    auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
+    decoder->SetDict(&dictionary);
+    decoders_[encoding] = decoder;
+  } else {
+    ParquetException::NYI("only plain dictionary encoding has been implemented");
+  }
+
+  current_decoder_ = decoders_[encoding].get();
+}
+
+template <typename DType>
+bool TypedRecordReader<DType>::ReadNewPage() {
+  // Loop until we find the next data page.
+  const uint8_t* buffer;
+
+  while (true) {
+    current_page_ = pager_->NextPage();
+    if (!current_page_) {
+      // EOS
+      return false;
+    }
+
+    if (current_page_->type() == PageType::DICTIONARY_PAGE) {
+      ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
+      continue;
+    } else if (current_page_->type() == PageType::DATA_PAGE) {
+      const DataPage* page = static_cast<const DataPage*>(current_page_.get());
+
+      // Read a data page.
+      num_buffered_values_ = page->num_values();
+
+      // Have not decoded any values from the data page yet
+      num_decoded_values_ = 0;
+
+      buffer = page->data();
+
+      // If the data page includes repetition and definition levels, we
+      // initialize the level decoder and subtract the encoded level bytes from
+      // the page size to determine the number of bytes in the encoded data.
+      int64_t data_size = page->size();
+
+      // Data page Layout: Repetition Levels - Definition Levels - encoded values.
+      // Levels are encoded as rle or bit-packed.
+      // Init repetition levels
+      if (descr_->max_repetition_level() > 0) {
+        int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
+            page->repetition_level_encoding(), descr_->max_repetition_level(),
+            static_cast<int>(num_buffered_values_), buffer);
+        buffer += rep_levels_bytes;
+        data_size -= rep_levels_bytes;
+      }
+      // TODO figure a way to set max_definition_level_ to 0
+      // if the initial value is invalid
+
+      // Init definition levels
+      if (descr_->max_definition_level() > 0) {
+        int64_t def_levels_bytes = definition_level_decoder_.SetData(
+            page->definition_level_encoding(), descr_->max_definition_level(),
+            static_cast<int>(num_buffered_values_), buffer);
+        buffer += def_levels_bytes;
+        data_size -= def_levels_bytes;
+      }
+
+      // Get a decoder object for this page or create a new decoder if this is the
+      // first page with this encoding.
+      Encoding::type encoding = page->encoding();
+
+      if (IsDictionaryIndexEncoding(encoding)) {
+        encoding = Encoding::RLE_DICTIONARY;
+      }
+
+      auto it = decoders_.find(static_cast<int>(encoding));
+      if (it != decoders_.end()) {
+        if (encoding == Encoding::RLE_DICTIONARY) {
+          DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
+        }
+        current_decoder_ = it->second.get();
+      } else {
+        switch (encoding) {
+          case Encoding::PLAIN: {
+            std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
+            decoders_[static_cast<int>(encoding)] = decoder;
+            current_decoder_ = decoder.get();
+            break;
+          }
+          case Encoding::RLE_DICTIONARY:
+            throw ParquetException("Dictionary page must be before data page.");
+
+          case Encoding::DELTA_BINARY_PACKED:
+          case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+          case Encoding::DELTA_BYTE_ARRAY:
+            ParquetException::NYI("Unsupported encoding");
+
+          default:
+            throw ParquetException("Unknown encoding type.");
+        }
+      }
+      current_decoder_->SetData(static_cast<int>(num_buffered_values_), buffer,
+                                static_cast<int>(data_size));
+      return true;
+    } else {
+      // We don't know what this page type is. We're allowed to skip non-data
+      // pages.
+      continue;
+    }
+  }
+  return true;
+}
+
+std::shared_ptr<RecordReader> RecordReader::Make(const ColumnDescriptor* descr,
+                                                 MemoryPool* pool) {
+  switch (descr->physical_type()) {
+    case Type::BOOLEAN:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<BooleanType>(descr, pool)));
+    case Type::INT32:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<Int32Type>(descr, pool)));
+    case Type::INT64:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<Int64Type>(descr, pool)));
+    case Type::INT96:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<Int96Type>(descr, pool)));
+    case Type::FLOAT:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<FloatType>(descr, pool)));
+    case Type::DOUBLE:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<DoubleType>(descr, pool)));
+    case Type::BYTE_ARRAY:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<ByteArrayType>(descr, pool)));
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::shared_ptr<RecordReader>(
+          new RecordReader(new TypedRecordReader<FLBAType>(descr, pool)));
+    default:
+      DCHECK(false);
+  }
+  // Unreachable code, but supress compiler warning
+  return nullptr;
+}
+
+// ----------------------------------------------------------------------
+// Implement public API
+
+RecordReader::RecordReader(RecordReaderImpl* impl) { impl_.reset(impl); }
+
+RecordReader::~RecordReader() {}
+
+int64_t RecordReader::ReadRecords(int64_t num_records) {
+  return impl_->ReadRecords(num_records);
+}
+
+void RecordReader::Reset() { return impl_->Reset(); }
+
+void RecordReader::Reserve(int64_t num_values) { impl_->Reserve(num_values); }
+
+const int16_t* RecordReader::def_levels() const { return impl_->def_levels(); }
+
+const int16_t* RecordReader::rep_levels() const { return impl_->rep_levels(); }
+
+const uint8_t* RecordReader::values() const { return impl_->values(); }
+
+std::shared_ptr<PoolBuffer> RecordReader::ReleaseValues() {
+  return impl_->ReleaseValues();
+}
+
+std::shared_ptr<PoolBuffer> RecordReader::ReleaseIsValid() {
+  return impl_->ReleaseIsValid();
+}
+
+::arrow::ArrayBuilder* RecordReader::builder() { return impl_->builder(); }
+
+int64_t RecordReader::values_written() const { return impl_->values_written(); }
+
+int64_t RecordReader::levels_position() const { return impl_->levels_position(); }
+
+int64_t RecordReader::levels_written() const { return impl_->levels_written(); }
+
+int64_t RecordReader::null_count() const { return impl_->null_count(); }
+
+bool RecordReader::nullable_values() const { return impl_->nullable_values(); }
+
+bool RecordReader::HasMoreData() const { return impl_->HasMoreData(); }
+
+void RecordReader::SetPageReader(std::unique_ptr<PageReader> reader) {
+  impl_->SetPageReader(std::move(reader));
+}
+
+}  // namespace internal
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/record_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/record_reader.h b/src/parquet/arrow/record_reader.h
new file mode 100644
index 0000000..8d55f9d
--- /dev/null
+++ b/src/parquet/arrow/record_reader.h
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_RECORD_READER_H
+#define PARQUET_RECORD_READER_H
+
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include <arrow/buffer.h>
+#include <arrow/builder.h>
+#include <arrow/memory_pool.h>
+#include <arrow/util/bit-util.h>
+
+#include "parquet/column_page.h"
+#include "parquet/schema.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+namespace internal {
+
+/// \brief Stateful column reader that delimits semantic records for both flat
+/// and nested columns
+///
+/// \note API EXPERIMENTAL
+/// \since 1.3.0
+class RecordReader {
+ public:
+  // So that we can create subclasses
+  class RecordReaderImpl;
+
+  static std::shared_ptr<RecordReader> Make(
+      const ColumnDescriptor* descr,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  virtual ~RecordReader();
+
+  /// \brief Decoded definition levels
+  const int16_t* def_levels() const;
+
+  /// \brief Decoded repetition levels
+  const int16_t* rep_levels() const;
+
+  /// \brief Decoded values, including nulls, if any
+  const uint8_t* values() const;
+
+  /// \brief Attempt to read indicated number of records from column chunk
+  /// \return number of records read
+  int64_t ReadRecords(int64_t num_records);
+
+  /// \brief Pre-allocate space for data. Results in better flat read performance
+  void Reserve(int64_t num_values);
+
+  /// \brief Clear consumed values and repetition/definition levels as the
+  /// result of calling ReadRecords
+  void Reset();
+
+  std::shared_ptr<PoolBuffer> ReleaseValues();
+  std::shared_ptr<PoolBuffer> ReleaseIsValid();
+  ::arrow::ArrayBuilder* builder();
+
+  /// \brief Number of values written including nulls (if any)
+  int64_t values_written() const;
+
+  /// \brief Number of definition / repetition levels (from those that have
+  /// been decoded) that have been consumed inside the reader.
+  int64_t levels_position() const;
+
+  /// \brief Number of definition / repetition levels that have been written
+  /// internally in the reader
+  int64_t levels_written() const;
+
+  /// \brief Number of nulls in the leaf
+  int64_t null_count() const;
+
+  /// \brief True if the leaf values are nullable
+  bool nullable_values() const;
+
+  /// \brief Return true if the record reader has more internal data yet to
+  /// process
+  bool HasMoreData() const;
+
+  /// \brief Advance record reader to the next row group
+  /// \param[in] reader obtained from RowGroupReader::GetColumnPageReader
+  void SetPageReader(std::unique_ptr<PageReader> reader);
+
+ private:
+  std::unique_ptr<RecordReaderImpl> impl_;
+  explicit RecordReader(RecordReaderImpl* impl);
+};
+
+}  // namespace internal
+}  // namespace parquet
+
+#endif  // PARQUET_RECORD_READER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.cc b/src/parquet/arrow/schema.cc
index 87e5b38..e16a1af 100644
--- a/src/parquet/arrow/schema.cc
+++ b/src/parquet/arrow/schema.cc
@@ -49,14 +49,14 @@ const auto TIMESTAMP_MS = ::arrow::timestamp(::arrow::TimeUnit::MILLI);
 const auto TIMESTAMP_US = ::arrow::timestamp(::arrow::TimeUnit::MICRO);
 const auto TIMESTAMP_NS = ::arrow::timestamp(::arrow::TimeUnit::NANO);
 
-TypePtr MakeDecimalType(const PrimitiveNode* node) {
-  int precision = node->decimal_metadata().precision;
-  int scale = node->decimal_metadata().scale;
+TypePtr MakeDecimalType(const PrimitiveNode& node) {
+  int precision = node.decimal_metadata().precision;
+  int scale = node.decimal_metadata().scale;
   return std::make_shared<::arrow::DecimalType>(precision, scale);
 }
 
-static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
-  switch (node->logical_type()) {
+static Status FromByteArray(const PrimitiveNode& node, TypePtr* out) {
+  switch (node.logical_type()) {
     case LogicalType::UTF8:
       *out = ::arrow::utf8();
       break;
@@ -71,17 +71,17 @@ static Status FromByteArray(const PrimitiveNode* node, TypePtr* out) {
   return Status::OK();
 }
 
-static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
-  switch (node->logical_type()) {
+static Status FromFLBA(const PrimitiveNode& node, TypePtr* out) {
+  switch (node.logical_type()) {
     case LogicalType::NONE:
-      *out = ::arrow::fixed_size_binary(node->type_length());
+      *out = ::arrow::fixed_size_binary(node.type_length());
       break;
     case LogicalType::DECIMAL:
       *out = MakeDecimalType(node);
       break;
     default:
       std::stringstream ss;
-      ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+      ss << "Unhandled logical type " << LogicalTypeToString(node.logical_type())
          << " for fixed-length binary array";
       return Status::NotImplemented(ss.str());
       break;
@@ -90,8 +90,8 @@ static Status FromFLBA(const PrimitiveNode* node, TypePtr* out) {
   return Status::OK();
 }
 
-static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
-  switch (node->logical_type()) {
+static Status FromInt32(const PrimitiveNode& node, TypePtr* out) {
+  switch (node.logical_type()) {
     case LogicalType::NONE:
       *out = ::arrow::int32();
       break;
@@ -124,7 +124,7 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
       break;
     default:
       std::stringstream ss;
-      ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+      ss << "Unhandled logical type " << LogicalTypeToString(node.logical_type())
          << " for INT32";
       return Status::NotImplemented(ss.str());
       break;
@@ -132,8 +132,8 @@ static Status FromInt32(const PrimitiveNode* node, TypePtr* out) {
   return Status::OK();
 }
 
-static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
-  switch (node->logical_type()) {
+static Status FromInt64(const PrimitiveNode& node, TypePtr* out) {
+  switch (node.logical_type()) {
     case LogicalType::NONE:
       *out = ::arrow::int64();
       break;
@@ -157,7 +157,7 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
       break;
     default:
       std::stringstream ss;
-      ss << "Unhandled logical type " << LogicalTypeToString(node->logical_type())
+      ss << "Unhandled logical type " << LogicalTypeToString(node.logical_type())
          << " for INT64";
       return Status::NotImplemented(ss.str());
       break;
@@ -165,13 +165,13 @@ static Status FromInt64(const PrimitiveNode* node, TypePtr* out) {
   return Status::OK();
 }
 
-Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
-  if (primitive->logical_type() == LogicalType::NA) {
+Status FromPrimitive(const PrimitiveNode& primitive, TypePtr* out) {
+  if (primitive.logical_type() == LogicalType::NA) {
     *out = ::arrow::null();
     return Status::OK();
   }
 
-  switch (primitive->physical_type()) {
+  switch (primitive.physical_type()) {
     case ParquetType::BOOLEAN:
       *out = ::arrow::boolean();
       break;
@@ -201,33 +201,33 @@ Status FromPrimitive(const PrimitiveNode* primitive, TypePtr* out) {
 }
 
 // Forward declaration
-Status NodeToFieldInternal(const NodePtr& node,
-                           const std::unordered_set<NodePtr>* included_leaf_nodes,
+Status NodeToFieldInternal(const Node& node,
+                           const std::unordered_set<const Node*>* included_leaf_nodes,
                            std::shared_ptr<Field>* out);
 
 /*
  * Auxilary function to test if a parquet schema node is a leaf node
  * that should be included in a resulting arrow schema
  */
-inline bool IsIncludedLeaf(const NodePtr& node,
-                           const std::unordered_set<NodePtr>* included_leaf_nodes) {
+inline bool IsIncludedLeaf(const Node& node,
+                           const std::unordered_set<const Node*>* included_leaf_nodes) {
   if (included_leaf_nodes == nullptr) {
     return true;
   }
-  auto search = included_leaf_nodes->find(node);
+  auto search = included_leaf_nodes->find(&node);
   return (search != included_leaf_nodes->end());
 }
 
-Status StructFromGroup(const GroupNode* group,
-                       const std::unordered_set<NodePtr>* included_leaf_nodes,
+Status StructFromGroup(const GroupNode& group,
+                       const std::unordered_set<const Node*>* included_leaf_nodes,
                        TypePtr* out) {
   std::vector<std::shared_ptr<Field>> fields;
   std::shared_ptr<Field> field;
 
   *out = nullptr;
 
-  for (int i = 0; i < group->field_count(); i++) {
-    RETURN_NOT_OK(NodeToFieldInternal(group->field(i), included_leaf_nodes, &field));
+  for (int i = 0; i < group.field_count(); i++) {
+    RETURN_NOT_OK(NodeToFieldInternal(*group.field(i), included_leaf_nodes, &field));
     if (field != nullptr) {
       fields.push_back(field);
     }
@@ -238,22 +238,23 @@ Status StructFromGroup(const GroupNode* group,
   return Status::OK();
 }
 
-Status NodeToList(const GroupNode* group,
-                  const std::unordered_set<NodePtr>* included_leaf_nodes, TypePtr* out) {
+Status NodeToList(const GroupNode& group,
+                  const std::unordered_set<const Node*>* included_leaf_nodes,
+                  TypePtr* out) {
   *out = nullptr;
-  if (group->field_count() == 1) {
+  if (group.field_count() == 1) {
     // This attempts to resolve the preferred 3-level list encoding.
-    NodePtr list_node = group->field(0);
-    if (list_node->is_group() && list_node->is_repeated()) {
-      const GroupNode* list_group = static_cast<const GroupNode*>(list_node.get());
+    const Node& list_node = *group.field(0);
+    if (list_node.is_group() && list_node.is_repeated()) {
+      const auto& list_group = static_cast<const GroupNode&>(list_node);
       // Special case mentioned in the format spec:
       //   If the name is array or ends in _tuple, this should be a list of struct
       //   even for single child elements.
-      if (list_group->field_count() == 1 && !HasStructListName(*list_group)) {
+      if (list_group.field_count() == 1 && !HasStructListName(list_group)) {
         // List of primitive type
         std::shared_ptr<Field> item_field;
         RETURN_NOT_OK(
-            NodeToFieldInternal(list_group->field(0), included_leaf_nodes, &item_field));
+            NodeToFieldInternal(*list_group.field(0), included_leaf_nodes, &item_field));
 
         if (item_field != nullptr) {
           *out = ::arrow::list(item_field);
@@ -263,18 +264,17 @@ Status NodeToList(const GroupNode* group,
         std::shared_ptr<::arrow::DataType> inner_type;
         RETURN_NOT_OK(StructFromGroup(list_group, included_leaf_nodes, &inner_type));
         if (inner_type != nullptr) {
-          auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+          auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
           *out = ::arrow::list(item_field);
         }
       }
-    } else if (list_node->is_repeated()) {
+    } else if (list_node.is_repeated()) {
       // repeated primitive node
       std::shared_ptr<::arrow::DataType> inner_type;
-      if (IsIncludedLeaf(static_cast<NodePtr>(list_node), included_leaf_nodes)) {
-        const PrimitiveNode* primitive =
-            static_cast<const PrimitiveNode*>(list_node.get());
-        RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
-        auto item_field = std::make_shared<Field>(list_node->name(), inner_type, false);
+      if (IsIncludedLeaf(static_cast<const Node&>(list_node), included_leaf_nodes)) {
+        RETURN_NOT_OK(
+            FromPrimitive(static_cast<const PrimitiveNode&>(list_node), &inner_type));
+        auto item_field = std::make_shared<Field>(list_node.name(), inner_type, false);
         *out = ::arrow::list(item_field);
       }
     } else {
@@ -288,49 +288,47 @@ Status NodeToList(const GroupNode* group,
   return Status::OK();
 }
 
-Status NodeToField(const NodePtr& node, std::shared_ptr<Field>* out) {
+Status NodeToField(const Node& node, std::shared_ptr<Field>* out) {
   return NodeToFieldInternal(node, nullptr, out);
 }
 
-Status NodeToFieldInternal(const NodePtr& node,
-                           const std::unordered_set<NodePtr>* included_leaf_nodes,
+Status NodeToFieldInternal(const Node& node,
+                           const std::unordered_set<const Node*>* included_leaf_nodes,
                            std::shared_ptr<Field>* out) {
   std::shared_ptr<::arrow::DataType> type = nullptr;
-  bool nullable = !node->is_required();
+  bool nullable = !node.is_required();
 
   *out = nullptr;
 
-  if (node->is_repeated()) {
+  if (node.is_repeated()) {
     // 1-level LIST encoding fields are required
     std::shared_ptr<::arrow::DataType> inner_type;
-    if (node->is_group()) {
-      const GroupNode* group = static_cast<const GroupNode*>(node.get());
-      RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &inner_type));
-    } else if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) {
-      const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
-      RETURN_NOT_OK(FromPrimitive(primitive, &inner_type));
+    if (node.is_group()) {
+      RETURN_NOT_OK(StructFromGroup(static_cast<const GroupNode&>(node),
+                                    included_leaf_nodes, &inner_type));
+    } else if (IsIncludedLeaf(node, included_leaf_nodes)) {
+      RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &inner_type));
     }
     if (inner_type != nullptr) {
-      auto item_field = std::make_shared<Field>(node->name(), inner_type, false);
+      auto item_field = std::make_shared<Field>(node.name(), inner_type, false);
       type = ::arrow::list(item_field);
       nullable = false;
     }
-  } else if (node->is_group()) {
-    const GroupNode* group = static_cast<const GroupNode*>(node.get());
-    if (node->logical_type() == LogicalType::LIST) {
+  } else if (node.is_group()) {
+    const auto& group = static_cast<const GroupNode&>(node);
+    if (node.logical_type() == LogicalType::LIST) {
       RETURN_NOT_OK(NodeToList(group, included_leaf_nodes, &type));
     } else {
       RETURN_NOT_OK(StructFromGroup(group, included_leaf_nodes, &type));
     }
   } else {
     // Primitive (leaf) node
-    if (IsIncludedLeaf(static_cast<NodePtr>(node), included_leaf_nodes)) {
-      const PrimitiveNode* primitive = static_cast<const PrimitiveNode*>(node.get());
-      RETURN_NOT_OK(FromPrimitive(primitive, &type));
+    if (IsIncludedLeaf(node, included_leaf_nodes)) {
+      RETURN_NOT_OK(FromPrimitive(static_cast<const PrimitiveNode&>(node), &type));
     }
   }
   if (type != nullptr) {
-    *out = std::make_shared<Field>(node->name(), type, nullable);
+    *out = std::make_shared<Field>(node.name(), type, nullable);
   }
   return Status::OK();
 }
@@ -339,12 +337,12 @@ Status FromParquetSchema(
     const SchemaDescriptor* parquet_schema,
     const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
     std::shared_ptr<::arrow::Schema>* out) {
-  const GroupNode* schema_node = parquet_schema->group_node();
+  const GroupNode& schema_node = *parquet_schema->group_node();
 
-  int num_fields = static_cast<int>(schema_node->field_count());
+  int num_fields = static_cast<int>(schema_node.field_count());
   std::vector<std::shared_ptr<Field>> fields(num_fields);
   for (int i = 0; i < num_fields; i++) {
-    RETURN_NOT_OK(NodeToField(schema_node->field(i), &fields[i]));
+    RETURN_NOT_OK(NodeToField(*schema_node.field(i), &fields[i]));
   }
 
   *out = std::make_shared<::arrow::Schema>(fields, key_value_metadata);
@@ -362,13 +360,13 @@ Status FromParquetSchema(
   // Index in column_indices should be unique, duplicate indices are merged into one and
   // ordering by its first appearing.
   int num_columns = static_cast<int>(column_indices.size());
-  std::unordered_set<NodePtr> top_nodes;  // to deduplicate the top nodes
-  std::vector<NodePtr> base_nodes;        // to keep the ordering
-  std::unordered_set<NodePtr> included_leaf_nodes(num_columns);
+  std::unordered_set<const Node*> top_nodes;  // to deduplicate the top nodes
+  std::vector<const Node*> base_nodes;        // to keep the ordering
+  std::unordered_set<const Node*> included_leaf_nodes(num_columns);
   for (int i = 0; i < num_columns; i++) {
-    auto column_desc = parquet_schema->Column(column_indices[i]);
-    included_leaf_nodes.insert(column_desc->schema_node());
-    auto column_root = parquet_schema->GetColumnRoot(column_indices[i]);
+    const ColumnDescriptor* column_desc = parquet_schema->Column(column_indices[i]);
+    included_leaf_nodes.insert(column_desc->schema_node().get());
+    const Node* column_root = parquet_schema->GetColumnRoot(column_indices[i]);
     auto insertion = top_nodes.insert(column_root);
     if (insertion.second) {
       base_nodes.push_back(column_root);
@@ -378,7 +376,7 @@ Status FromParquetSchema(
   std::vector<std::shared_ptr<Field>> fields;
   std::shared_ptr<Field> field;
   for (auto node : base_nodes) {
-    RETURN_NOT_OK(NodeToFieldInternal(node, &included_leaf_nodes, &field));
+    RETURN_NOT_OK(NodeToFieldInternal(*node, &included_leaf_nodes, &field));
     if (field != nullptr) {
       fields.push_back(field);
     }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/arrow/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/schema.h b/src/parquet/arrow/schema.h
index 73e48f2..de153eb 100644
--- a/src/parquet/arrow/schema.h
+++ b/src/parquet/arrow/schema.h
@@ -37,8 +37,9 @@ namespace parquet {
 
 namespace arrow {
 
-::arrow::Status PARQUET_EXPORT NodeToField(const schema::NodePtr& node,
-                                           std::shared_ptr<::arrow::Field>* out);
+PARQUET_EXPORT
+::arrow::Status NodeToField(const schema::Node& node,
+                            std::shared_ptr<::arrow::Field>* out);
 
 /// Convert parquet schema to arrow schema with selected indices
 /// \param parquet_schema to be converted
@@ -48,16 +49,18 @@ namespace arrow {
 /// \param key_value_metadata optional metadata, can be nullptr
 /// \param out the corresponding arrow schema
 /// \return Status::OK() on a successful conversion.
-::arrow::Status PARQUET_EXPORT FromParquetSchema(
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(
     const SchemaDescriptor* parquet_schema, const std::vector<int>& column_indices,
     const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
     std::shared_ptr<::arrow::Schema>* out);
 
 // Without indices
-::arrow::Status PARQUET_EXPORT
-FromParquetSchema(const SchemaDescriptor* parquet_schema,
-                  const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
-                  std::shared_ptr<::arrow::Schema>* out);
+PARQUET_EXPORT
+::arrow::Status FromParquetSchema(
+    const SchemaDescriptor* parquet_schema,
+    const std::shared_ptr<const KeyValueMetadata>& key_value_metadata,
+    std::shared_ptr<::arrow::Schema>* out);
 
 // Without metadata
 ::arrow::Status PARQUET_EXPORT FromParquetSchema(const SchemaDescriptor* parquet_schema,

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/column_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index 5f6259f..d23e738 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -21,7 +21,10 @@
 #include <cstdint>
 #include <memory>
 
-#include "arrow/util/rle-encoding.h"
+#include <arrow/buffer.h>
+#include <arrow/memory_pool.h>
+#include <arrow/util/bit-util.h>
+#include <arrow/util/rle-encoding.h>
 
 #include "parquet/column_page.h"
 #include "parquet/encoding-internal.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/column_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index df7deb8..6172365 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -19,6 +19,7 @@
 #define PARQUET_COLUMN_READER_H
 
 #include <algorithm>
+#include <climits>
 #include <cstdint>
 #include <cstring>
 #include <iostream>
@@ -26,6 +27,9 @@
 #include <unordered_map>
 #include <vector>
 
+#include <arrow/buffer.h>
+#include <arrow/builder.h>
+#include <arrow/memory_pool.h>
 #include <arrow/util/bit-util.h>
 
 #include "parquet/column_page.h"
@@ -45,6 +49,8 @@ class RleDecoder;
 
 namespace parquet {
 
+namespace BitUtil = ::arrow::BitUtil;
+
 class PARQUET_EXPORT LevelDecoder {
  public:
   LevelDecoder();
@@ -104,6 +110,12 @@ class PARQUET_EXPORT ColumnReader {
   // Returns the number of decoded repetition levels
   int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels);
 
+  int64_t available_values_current_page() const {
+    return num_buffered_values_ - num_decoded_values_;
+  }
+
+  void ConsumeBufferedValues(int64_t num_values) { num_decoded_values_ += num_values; }
+
   const ColumnDescriptor* descr_;
 
   std::unique_ptr<PageReader> pager_;
@@ -130,7 +142,58 @@ class PARQUET_EXPORT ColumnReader {
   ::arrow::MemoryPool* pool_;
 };
 
-// API to read values from a single column. This is the main client facing API.
+namespace internal {
+
+static inline void DefinitionLevelsToBitmap(
+    const int16_t* def_levels, int64_t num_def_levels, const int16_t max_definition_level,
+    const int16_t max_repetition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, const int64_t valid_bits_offset) {
+  int64_t byte_offset = valid_bits_offset / 8;
+  int64_t bit_offset = valid_bits_offset % 8;
+  uint8_t bitset = valid_bits[byte_offset];
+
+  // TODO(itaiin): As an interim solution we are splitting the code path here
+  // between repeated+flat column reads, and non-repeated+nested reads.
+  // Those paths need to be merged in the future
+  for (int i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level) {
+      bitset |= (1 << bit_offset);
+    } else if (max_repetition_level > 0) {
+      // repetition+flat case
+      if (def_levels[i] == (max_definition_level - 1)) {
+        bitset &= ~(1 << bit_offset);
+        *null_count += 1;
+      } else {
+        continue;
+      }
+    } else {
+      // non-repeated+nested case
+      if (def_levels[i] < max_definition_level) {
+        bitset &= ~(1 << bit_offset);
+        *null_count += 1;
+      } else {
+        throw ParquetException("definition level exceeds maximum");
+      }
+    }
+
+    bit_offset++;
+    if (bit_offset == CHAR_BIT) {
+      bit_offset = 0;
+      valid_bits[byte_offset] = bitset;
+      byte_offset++;
+      // TODO: Except for the last byte, this shouldn't be needed
+      bitset = valid_bits[byte_offset];
+    }
+  }
+  if (bit_offset != 0) {
+    valid_bits[byte_offset] = bitset;
+  }
+  *values_read = bit_offset + byte_offset * 8 - valid_bits_offset;
+}
+
+}  // namespace internal
+
+// API to read values from a single column. This is a main client facing API.
 template <typename DType>
 class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
  public:
@@ -138,7 +201,7 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
 
   TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager,
                     ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
-      : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULL) {}
+      : ColumnReader(schema, std::move(pager), pool), current_decoder_(nullptr) {}
   virtual ~TypedColumnReader() {}
 
   // Read a batch of repetition levels, definition levels, and values from the
@@ -208,7 +271,7 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
   typedef Decoder<DType> DecoderType;
 
   // Advance to the next data page
-  virtual bool ReadNewPage();
+  bool ReadNewPage() override;
 
   // Read up to batch_size values from the current data page into the
   // pre-allocated memory T*
@@ -221,7 +284,7 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
   // to the def_levels.
   //
   // @returns: the number of values read into the out buffer
-  int64_t ReadValuesSpaced(int64_t batch_size, T* out, int null_count,
+  int64_t ReadValuesSpaced(int64_t batch_size, T* out, int64_t null_count,
                            uint8_t* valid_bits, int64_t valid_bits_offset);
 
   // Map of encoding type to the respective decoder object. For example, a
@@ -234,6 +297,9 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
   DecoderType* current_decoder_;
 };
 
+// ----------------------------------------------------------------------
+// Type column reader implementations
+
 template <typename DType>
 inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out) {
   int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
@@ -242,11 +308,12 @@ inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out)
 
 template <typename DType>
 inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out,
-                                                          int null_count,
+                                                          int64_t null_count,
                                                           uint8_t* valid_bits,
                                                           int64_t valid_bits_offset) {
-  return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size), null_count,
-                                        valid_bits, valid_bits_offset);
+  return current_decoder_->DecodeSpaced(out, static_cast<int>(batch_size),
+                                        static_cast<int>(null_count), valid_bits,
+                                        valid_bits_offset);
 }
 
 template <typename DType>
@@ -294,59 +361,34 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
 
   *values_read = ReadValues(values_to_read, values);
   int64_t total_values = std::max(num_def_levels, *values_read);
-  num_decoded_values_ += total_values;
+  ConsumeBufferedValues(total_values);
 
   return total_values;
 }
 
-inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
-                                     int16_t max_definition_level,
-                                     int16_t max_repetition_level, int64_t* values_read,
-                                     int64_t* null_count, uint8_t* valid_bits,
-                                     int64_t valid_bits_offset) {
-  int byte_offset = static_cast<int>(valid_bits_offset) / 8;
-  int bit_offset = static_cast<int>(valid_bits_offset) % 8;
-  uint8_t bitset = valid_bits[byte_offset];
+namespace internal {
 
-  // TODO(itaiin): As an interim solution we are splitting the code path here
-  // between repeated+flat column reads, and non-repeated+nested reads.
-  // Those paths need to be merged in the future
-  for (int i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level) {
-      bitset |= (1 << bit_offset);
-    } else if (max_repetition_level > 0) {
-      // repetition+flat case
-      if (def_levels[i] == (max_definition_level - 1)) {
-        bitset &= ~(1 << bit_offset);
-        *null_count += 1;
-      } else {
-        continue;
-      }
-    } else {
-      // non-repeated+nested case
-      if (def_levels[i] < max_definition_level) {
-        bitset &= ~(1 << bit_offset);
-        *null_count += 1;
-      } else {
-        throw ParquetException("definition level exceeds maximum");
+// TODO(itaiin): another code path split to merge when the general case is done
+static inline bool HasSpacedValues(const ColumnDescriptor* descr) {
+  if (descr->max_repetition_level() > 0) {
+    // repeated+flat case
+    return !descr->schema_node()->is_required();
+  } else {
+    // non-repeated+nested case
+    // Find if a node forces nulls in the lowest level along the hierarchy
+    const schema::Node* node = descr->schema_node().get();
+    while (node) {
+      if (node->is_optional()) {
+        return true;
       }
+      node = node->parent();
     }
-
-    bit_offset++;
-    if (bit_offset == 8) {
-      bit_offset = 0;
-      valid_bits[byte_offset] = bitset;
-      byte_offset++;
-      // TODO: Except for the last byte, this shouldn't be needed
-      bitset = valid_bits[byte_offset];
-    }
+    return false;
   }
-  if (bit_offset != 0) {
-    valid_bits[byte_offset] = bitset;
-  }
-  *values_read = (bit_offset + byte_offset * 8 - valid_bits_offset);
 }
 
+}  // namespace internal
+
 template <typename DType>
 inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(
     int64_t batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
@@ -377,25 +419,7 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(
       }
     }
 
-    // TODO(itaiin): another code path split to merge when the general case is done
-    bool has_spaced_values;
-    if (descr_->max_repetition_level() > 0) {
-      // repeated+flat case
-      has_spaced_values = !descr_->schema_node()->is_required();
-    } else {
-      // non-repeated+nested case
-      // Find if a node forces nulls in the lowest level along the hierarchy
-      const schema::Node* node = descr_->schema_node().get();
-      has_spaced_values = false;
-      while (node) {
-        auto parent = node->parent();
-        if (node->is_optional()) {
-          has_spaced_values = true;
-          break;
-        }
-        node = parent;
-      }
-    }
+    const bool has_spaced_values = internal::HasSpacedValues(descr_);
 
     int64_t null_count = 0;
     if (!has_spaced_values) {
@@ -413,9 +437,9 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(
     } else {
       int16_t max_definition_level = descr_->max_definition_level();
       int16_t max_repetition_level = descr_->max_repetition_level();
-      DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
-                               max_repetition_level, values_read, &null_count, valid_bits,
-                               valid_bits_offset);
+      internal::DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
+                                         max_repetition_level, values_read, &null_count,
+                                         valid_bits, valid_bits_offset);
       total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
                                       valid_bits, valid_bits_offset);
     }
@@ -432,12 +456,12 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(
     *levels_read = total_values;
   }
 
-  num_decoded_values_ += *levels_read;
+  ConsumeBufferedValues(*levels_read);
   return total_values;
 }
 
 template <typename DType>
-inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
+int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
   int64_t rows_to_skip = num_rows_to_skip;
   while (HasNext() && rows_to_skip > 0) {
     // If the number of rows to skip is more than the number of undecoded values, skip the
@@ -472,6 +496,9 @@ inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
   return num_rows_to_skip - rows_to_skip;
 }
 
+// ----------------------------------------------------------------------
+// Template instantiations
+
 typedef TypedColumnReader<BooleanType> BoolReader;
 typedef TypedColumnReader<Int32Type> Int32Reader;
 typedef TypedColumnReader<Int64Type> Int64Reader;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/column_writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc
index 48f243e..47b86e3 100644
--- a/src/parquet/column_writer-test.cc
+++ b/src/parquet/column_writer-test.cc
@@ -491,7 +491,7 @@ TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
   auto writer = this->BuildWriter(LARGE_SIZE);
   // All values being written are NULL
   writer->WriteBatch(this->values_.size(), definition_levels.data(),
-                     repetition_levels.data(), NULL);
+                     repetition_levels.data(), nullptr);
   writer->Close();
 
   // Just read the first SMALL_SIZE rows to ensure we could read it back in

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/encoding-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index fab1f63..5818fd3 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -54,7 +54,7 @@ class PlainDecoder : public Decoder<DType> {
   using Decoder<DType>::num_values_;
 
   explicit PlainDecoder(const ColumnDescriptor* descr)
-      : Decoder<DType>(descr, Encoding::PLAIN), data_(NULL), len_(0) {
+      : Decoder<DType>(descr, Encoding::PLAIN), data_(nullptr), len_(0) {
     if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
       type_length_ = descr_->type_length();
     } else {
@@ -943,7 +943,7 @@ class DeltaByteArrayDecoder : public Decoder<ByteArrayType> {
     for (int i = 0; i < max_values; ++i) {
       int prefix_len = 0;
       prefix_len_decoder_.Decode(&prefix_len, 1);
-      ByteArray suffix = {0, NULL};
+      ByteArray suffix = {0, nullptr};
       suffix_decoder_.Decode(&suffix, 1);
       buffer[i].len = prefix_len + suffix.len;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encoding.h b/src/parquet/encoding.h
index 339eb35..e7ed415 100644
--- a/src/parquet/encoding.h
+++ b/src/parquet/encoding.h
@@ -113,6 +113,10 @@ class Decoder {
       throw ParquetException("Number of values / definition_levels read did not match");
     }
 
+    // Depending on the number of nulls, some of the value slots in buffer may
+    // be uninitialized, and this will cause valgrind warnings / potentially UB
+    memset(buffer + values_read, 0, (num_values - values_read) * sizeof(T));
+
     // Add spacing for null entries. As we have filled the buffer from the front,
     // we need to add the spacing from the back.
     int values_to_move = values_read;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 0d8e10e..7f990e0 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -91,7 +91,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
   // API convenience to get a MetaData accessor
   static std::unique_ptr<ColumnChunkMetaData> Make(
       const uint8_t* metadata, const ColumnDescriptor* descr,
-      const ApplicationVersion* writer_version = NULL);
+      const ApplicationVersion* writer_version = nullptr);
 
   ~ColumnChunkMetaData();
 
@@ -116,7 +116,7 @@ class PARQUET_EXPORT ColumnChunkMetaData {
 
  private:
   explicit ColumnChunkMetaData(const uint8_t* metadata, const ColumnDescriptor* descr,
-                               const ApplicationVersion* writer_version = NULL);
+                               const ApplicationVersion* writer_version = nullptr);
   // PIMPL Idiom
   class ColumnChunkMetaDataImpl;
   std::unique_ptr<ColumnChunkMetaDataImpl> impl_;
@@ -127,7 +127,7 @@ class PARQUET_EXPORT RowGroupMetaData {
   // API convenience to get a MetaData accessor
   static std::unique_ptr<RowGroupMetaData> Make(
       const uint8_t* metadata, const SchemaDescriptor* schema,
-      const ApplicationVersion* writer_version = NULL);
+      const ApplicationVersion* writer_version = nullptr);
 
   ~RowGroupMetaData();
 
@@ -141,7 +141,7 @@ class PARQUET_EXPORT RowGroupMetaData {
 
  private:
   explicit RowGroupMetaData(const uint8_t* metadata, const SchemaDescriptor* schema,
-                            const ApplicationVersion* writer_version = NULL);
+                            const ApplicationVersion* writer_version = nullptr);
   // PIMPL Idiom
   class RowGroupMetaDataImpl;
   std::unique_ptr<RowGroupMetaDataImpl> impl_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/file/printer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/printer.cc b/src/parquet/file/printer.cc
index 2ba9474..727552d 100644
--- a/src/parquet/file/printer.cc
+++ b/src/parquet/file/printer.cc
@@ -109,7 +109,7 @@ void ParquetFilePrinter::DebugPrint(std::ostream& stream, std::list<int> selecte
     char buffer[bufsize];
 
     // Create readers for selected columns and print contents
-    vector<std::shared_ptr<Scanner>> scanners(selected_columns.size(), NULL);
+    vector<std::shared_ptr<Scanner>> scanners(selected_columns.size(), nullptr);
     int j = 0;
     for (auto i : selected_columns) {
       std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 5ff7398..bc14ec9 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -100,7 +100,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
     }
 
     // Uncompress it if we need to
-    if (decompressor_ != NULL) {
+    if (decompressor_ != nullptr) {
       // Grow the uncompressed buffer if we need to.
       if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
         PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 26876fc..9b9bde9 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -56,6 +56,13 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
       const_cast<ReaderProperties*>(contents_->properties())->memory_pool());
 }
 
+std::unique_ptr<PageReader> RowGroupReader::GetColumnPageReader(int i) {
+  DCHECK(i < metadata()->num_columns()) << "The RowGroup only has "
+                                        << metadata()->num_columns()
+                                        << "columns, requested column: " << i;
+  return contents_->GetColumnPageReader(i);
+}
+
 // Returns the rowgroup metadata
 const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 0467640..7558394 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -58,6 +58,8 @@ class PARQUET_EXPORT RowGroupReader {
   // column. Ownership is shared with the RowGroupReader.
   std::shared_ptr<ColumnReader> Column(int i);
 
+  std::unique_ptr<PageReader> GetColumnPageReader(int i);
+
  private:
   // Holds a pointer to an instance of Contents implementation
   std::unique_ptr<Contents> contents_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/schema-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema-test.cc b/src/parquet/schema-test.cc
index faacb76..f806c12 100644
--- a/src/parquet/schema-test.cc
+++ b/src/parquet/schema-test.cc
@@ -125,17 +125,17 @@ TEST_F(TestPrimitiveNode, Attrs) {
   ASSERT_EQ(LogicalType::UTF8, node2.logical_type());
 
   // repetition
-  node1 = PrimitiveNode("foo", Repetition::REQUIRED, Type::INT32);
-  node2 = PrimitiveNode("foo", Repetition::OPTIONAL, Type::INT32);
   PrimitiveNode node3("foo", Repetition::REPEATED, Type::INT32);
-
-  ASSERT_TRUE(node1.is_required());
-
-  ASSERT_TRUE(node2.is_optional());
-  ASSERT_FALSE(node2.is_required());
+  PrimitiveNode node4("foo", Repetition::REQUIRED, Type::INT32);
+  PrimitiveNode node5("foo", Repetition::OPTIONAL, Type::INT32);
 
   ASSERT_TRUE(node3.is_repeated());
   ASSERT_FALSE(node3.is_optional());
+
+  ASSERT_TRUE(node4.is_required());
+
+  ASSERT_TRUE(node5.is_optional());
+  ASSERT_FALSE(node5.is_required());
 }
 
 TEST_F(TestPrimitiveNode, FromParquet) {
@@ -687,10 +687,10 @@ TEST_F(TestSchemaDescriptor, BuildTree) {
   ASSERT_TRUE(descr_.ColumnIndex(*non_column_alien.get()) < 0);
   ASSERT_TRUE(descr_.ColumnIndex(*non_column_familiar.get()) < 0);
 
-  ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0).get());
-  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3).get());
-  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4).get());
-  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5).get());
+  ASSERT_EQ(inta.get(), descr_.GetColumnRoot(0));
+  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(3));
+  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(4));
+  ASSERT_EQ(bag.get(), descr_.GetColumnRoot(5));
 
   ASSERT_EQ(schema.get(), descr_.group_node());
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/schema.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema.cc b/src/parquet/schema.cc
index ddd8ac1..8168dc4 100644
--- a/src/parquet/schema.cc
+++ b/src/parquet/schema.cc
@@ -20,6 +20,8 @@
 
 #include <algorithm>
 #include <memory>
+#include <sstream>
+#include <string>
 
 #include "parquet/exception.h"
 #include "parquet/parquet_types.h"
@@ -701,9 +703,15 @@ int SchemaDescriptor::ColumnIndex(const Node& node) const {
   return result;
 }
 
-const schema::NodePtr& SchemaDescriptor::GetColumnRoot(int i) const {
+const schema::Node* SchemaDescriptor::GetColumnRoot(int i) const {
   DCHECK(i >= 0 && i < static_cast<int>(leaves_.size()));
-  return leaf_to_base_.find(i)->second;
+  return leaf_to_base_.find(i)->second.get();
+}
+
+std::string SchemaDescriptor::ToString() const {
+  std::ostringstream ss;
+  PrintSchema(schema_.get(), ss);
+  return ss.str();
 }
 
 int ColumnDescriptor::type_scale() const {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/schema.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema.h b/src/parquet/schema.h
index c6b7fbe..f93f0db 100644
--- a/src/parquet/schema.h
+++ b/src/parquet/schema.h
@@ -178,6 +178,9 @@ class PARQUET_EXPORT Node {
 
   bool EqualsInternal(const Node* other) const;
   void SetParent(const Node* p_parent);
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Node);
 };
 
 // Save our breath all over the place with these typedefs
@@ -255,7 +258,7 @@ class PARQUET_EXPORT GroupNode : public Node {
 
   bool Equals(const Node* other) const override;
 
-  const NodePtr& field(int i) const { return fields_[i]; }
+  NodePtr field(int i) const { return fields_[i]; }
   int FieldIndex(const std::string& name) const;
   int FieldIndex(const Node& node) const;
 
@@ -398,10 +401,12 @@ class PARQUET_EXPORT SchemaDescriptor {
   const schema::GroupNode* group_node() const { return group_node_; }
 
   // Returns the root (child of the schema root) node of the leaf(column) node
-  const schema::NodePtr& GetColumnRoot(int i) const;
+  const schema::Node* GetColumnRoot(int i) const;
 
   const std::string& name() const { return group_node_->name(); }
 
+  std::string ToString() const;
+
  private:
   friend class ColumnDescriptor;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc
index d3ec942..1521cbd 100644
--- a/src/parquet/statistics-test.cc
+++ b/src/parquet/statistics-test.cc
@@ -194,8 +194,9 @@ bool* TestRowGroupStatistics<BooleanType>::GetValuesPointer(std::vector<bool>& v
 }
 
 template <typename TestType>
-typename std::vector<typename TestType::c_type> TestRowGroupStatistics<
-    TestType>::GetDeepCopy(const std::vector<typename TestType::c_type>& values) {
+typename std::vector<typename TestType::c_type>
+TestRowGroupStatistics<TestType>::GetDeepCopy(
+    const std::vector<typename TestType::c_type>& values) {
   return values;
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h
index 3fd72f2..0698652 100644
--- a/src/parquet/test-util.h
+++ b/src/parquet/test-util.h
@@ -377,7 +377,7 @@ static void PaginatePlain(const ColumnDescriptor* d,
     }
     shared_ptr<DataPage> page = MakeDataPage<Type>(
         d, slice(values, value_start, value_start + values_per_page[i]),
-        values_per_page[i], encoding, NULL, 0,
+        values_per_page[i], encoding, nullptr, 0,
         slice(def_levels, def_level_start, def_level_end), max_def_level,
         slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
     pages.push_back(page);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/util/memory-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc
index cdb6d21..16617a7 100644
--- a/src/parquet/util/memory-test.cc
+++ b/src/parquet/util/memory-test.cc
@@ -214,7 +214,7 @@ TEST(ChunkedAllocatorTest, MemoryOverhead) {
 
   for (int i = 0; i < num_allocs; ++i) {
     uint8_t* mem = p.Allocate(alloc_size);
-    ASSERT_TRUE(mem != NULL);
+    ASSERT_TRUE(mem != nullptr);
     total_allocated += alloc_size;
 
     int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
@@ -239,7 +239,7 @@ TEST(ChunkedAllocatorTest, FragmentationOverhead) {
   for (int i = 0; i < num_allocs; ++i) {
     int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE;
     uint8_t* mem = p.Allocate(alloc_size);
-    ASSERT_TRUE(mem != NULL);
+    ASSERT_TRUE(mem != nullptr);
     total_allocated += alloc_size;
 
     int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/util/memory.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
index d3a5226..3aa2570 100644
--- a/src/parquet/util/memory.cc
+++ b/src/parquet/util/memory.cc
@@ -121,14 +121,18 @@ void ChunkedAllocator::ReturnPartialAllocation(int byte_size) {
 
 template <bool CHECK_LIMIT_FIRST>
 uint8_t* ChunkedAllocator::Allocate(int size) {
-  if (size == 0) return NULL;
+  if (size == 0) {
+    return nullptr;
+  }
 
   int64_t num_bytes = ::arrow::BitUtil::RoundUp(size, 8);
   if (current_chunk_idx_ == -1 ||
       num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
           chunks_[current_chunk_idx_].size) {
-    // If we couldn't allocate a new chunk, return NULL.
-    if (ARROW_PREDICT_FALSE(!FindChunk(num_bytes))) return NULL;
+    // If we couldn't allocate a new chunk, return nullptr.
+    if (ARROW_PREDICT_FALSE(!FindChunk(num_bytes))) {
+      return nullptr;
+    }
   }
   ChunkInfo& info = chunks_[current_chunk_idx_];
   uint8_t* result = info.data + info.allocated_bytes;
@@ -195,7 +199,7 @@ bool ChunkedAllocator::FindChunk(int64_t min_size) {
     // Allocate a new chunk. Return early if malloc fails.
     uint8_t* buf = nullptr;
     PARQUET_THROW_NOT_OK(pool_->Allocate(chunk_size, &buf));
-    if (ARROW_PREDICT_FALSE(buf == NULL)) {
+    if (ARROW_PREDICT_FALSE(buf == nullptr)) {
       DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
       current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
       return false;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/util/memory.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index 04dcca4..94b86c1 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -188,7 +188,7 @@ class PARQUET_EXPORT ChunkedAllocator {
 
     explicit ChunkInfo(int64_t size, uint8_t* buf);
 
-    ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {}
+    ChunkInfo() : data(nullptr), size(0), allocated_bytes(0) {}
   };
 
   /// chunk from which we served the last Allocate() call;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/4b09ac70/src/parquet/util/schema-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/schema-util.h b/src/parquet/util/schema-util.h
index 9187962..ef9087b 100644
--- a/src/parquet/util/schema-util.h
+++ b/src/parquet/util/schema-util.h
@@ -25,7 +25,6 @@
 #include "parquet/exception.h"
 #include "parquet/schema.h"
 #include "parquet/types.h"
-#include "parquet/util/logging.h"
 
 using parquet::ParquetException;
 using parquet::SchemaDescriptor;
@@ -49,14 +48,14 @@ inline bool HasStructListName(const GroupNode& node) {
 }
 
 // TODO(itaiin): This aux. function is to be deleted once repeated structs are supported
-inline bool IsSimpleStruct(const NodePtr& node) {
+inline bool IsSimpleStruct(const Node* node) {
   if (!node->is_group()) return false;
   if (node->is_repeated()) return false;
   if (node->logical_type() == LogicalType::LIST) return false;
   // Special case mentioned in the format spec:
   //   If the name is array or ends in _tuple, this should be a list of struct
   //   even for single child elements.
-  auto group = static_cast<const GroupNode*>(node.get());
+  auto group = static_cast<const GroupNode*>(node);
   if (group->field_count() == 1 && HasStructListName(*group)) return false;
 
   return true;


Mime
View raw message