parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From n...@apache.org
Subject parquet-cpp git commit: PARQUET-538: Improve ColumnReader Tests
Date Wed, 24 Feb 2016 18:30:45 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 35a48fb26 -> 1df5a26d6


PARQUET-538: Improve ColumnReader Tests

closes #43 and closes #50
This PR also implements
1) PARQUET-532: Null values detection needs to be fixed and tested
2) PARQUET-502: Scanner segfaults when its batch size is smaller than the number of rows
3) PARQUET-526: Add more complete unit test coverage for column Scanner implementations
4) PARQUET-531: Can't read past first page in a column

Author: Deepak Majeti <deepak.majeti@hp.com>

Closes #62 from majetideepak/PARQUET-538 and squashes the following commits:

1e56f83 [Deepak Majeti] Trigger notification
6478a7c [Deepak Majeti] TYPED_TEST
1d14171 [Deepak Majeti] Added Boolean Test and Scanner:Next API
d1da031 [Deepak Majeti] lint issue
45f10aa [Deepak Majeti] Reproducer for PARQUET-502
88e27c6 [Deepak Majeti] formatting
8aac435 [Deepak Majeti] PARQUET-526
dca7e2d [Deepak Majeti] PARQUET-532 and PARQUET-502 Fix
a622021 [Deepak Majeti] Reverted PARQUET-524 and addressed comments
859c1df [Deepak Majeti] minor comment edits
d938a13 [Deepak Majeti] PARQUET-538
df1fbd7 [Deepak Majeti] Templated single page tests
8548e3c [Deepak Majeti] PARQUET-524
c265fea [Deepak Majeti] fixed PARQUET-499 bugs


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/1df5a26d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/1df5a26d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/1df5a26d

Branch: refs/heads/master
Commit: 1df5a26d62386b38cdd544cfcbb2d009b32a77dd
Parents: 35a48fb
Author: Deepak Majeti <deepak.majeti@hp.com>
Authored: Wed Feb 24 10:30:46 2016 -0800
Committer: Nong Li <nongli@gmail.com>
Committed: Wed Feb 24 10:30:46 2016 -0800

----------------------------------------------------------------------
 src/parquet/column/CMakeLists.txt            |   1 +
 src/parquet/column/column-reader-test.cc     | 281 +++++++++-------------
 src/parquet/column/reader.h                  |   2 +-
 src/parquet/column/scanner-test.cc           | 268 +++++++++++++++++++++
 src/parquet/column/scanner.h                 |  42 +++-
 src/parquet/column/test-util.h               |  57 ++++-
 src/parquet/encodings/plain-encoding-test.cc |  19 +-
 src/parquet/schema/descriptor.h              |  12 +
 src/parquet/schema/schema-descriptor-test.cc |   2 +-
 src/parquet/schema/types.h                   |   5 +-
 src/parquet/util/test-common.h               |  30 ++-
 11 files changed, 508 insertions(+), 211 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/column/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt
index 99b4ed2..fa1ce7a 100644
--- a/src/parquet/column/CMakeLists.txt
+++ b/src/parquet/column/CMakeLists.txt
@@ -25,3 +25,4 @@ install(FILES
 
 ADD_PARQUET_TEST(column-reader-test)
 ADD_PARQUET_TEST(levels-test)
+ADD_PARQUET_TEST(scanner-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 3a2bbd8..079201a 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -44,195 +44,132 @@ namespace test {
 
 class TestPrimitiveReader : public ::testing::Test {
  public:
-  void SetUp() {}
-
-  void TearDown() {}
+  void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) {
+    num_levels_ = levels_per_page * num_pages;
+    num_values_ = 0;
+    uint32_t seed = 0;
+    int16_t zero = 0;
+    vector<int> values_per_page(num_pages, levels_per_page);
+    // Create definition levels
+    if (max_def_level_ > 0) {
+      def_levels_.resize(num_levels_);
+      random_numbers(num_levels_, seed, zero, max_def_level_, def_levels_.data());
+      for (int p = 0; p < num_pages; p++) {
+        int num_values_per_page = 0;
+        for (int i = 0; i < levels_per_page; i++) {
+          if (def_levels_[i + p * levels_per_page] == max_def_level_) {
+            num_values_per_page++;
+            num_values_++;
+          }
+        }
+        values_per_page[p] = num_values_per_page;
+      }
+    } else {
+      num_values_ = num_levels_;
+    }
+    // Create repitition levels
+    if (max_rep_level_ > 0) {
+      rep_levels_.resize(num_levels_);
+      random_numbers(num_levels_, seed, zero, max_rep_level_, rep_levels_.data());
+    }
+    // Create values
+    values_.resize(num_values_);
+    random_numbers(num_values_, seed, std::numeric_limits<int32_t>::min(),
+       std::numeric_limits<int32_t>::max(), values_.data());
+    Paginate<Type::INT32, int32_t>(d, values_, def_levels_, max_def_level_,
+        rep_levels_, max_rep_level_, levels_per_page, values_per_page, pages_);
+  }
 
-  void InitReader(const ColumnDescriptor* descr) {
+  void InitReader(const ColumnDescriptor* d) {
+    std::unique_ptr<PageReader> pager_;
     pager_.reset(new test::MockPageReader(pages_));
-    reader_ = ColumnReader::Make(descr, std::move(pager_));
+    reader_ = ColumnReader::Make(d, std::move(pager_));
+  }
+
+  void CheckResults() {
+    vector<int32_t> vresult(num_values_, -1);
+    vector<int16_t> dresult(num_levels_, -1);
+    vector<int16_t> rresult(num_levels_, -1);
+    size_t values_read = 0;
+    size_t total_values_read = 0;
+    size_t batch_actual = 0;
+
+    Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+    int32_t batch_size = 8;
+    size_t batch = 0;
+    // This will cover both the cases
+    // 1) batch_size < page_size (multiple ReadBatch from a single page)
+    // 2) batch_size > page_size (BatchRead limits to a single page)
+    do {
+      batch = reader->ReadBatch(batch_size, &dresult[0] + batch_actual,
+          &rresult[0] + batch_actual, &vresult[0] + total_values_read, &values_read);
+      total_values_read += values_read;
+      batch_actual += batch;
+      batch_size = std::max(batch_size * 2, 4096);
+    } while (batch > 0);
+
+    ASSERT_EQ(num_levels_, batch_actual);
+    ASSERT_EQ(num_values_, total_values_read);
+    ASSERT_TRUE(vector_equal(values_, vresult));
+    if (max_def_level_ > 0) {
+      ASSERT_TRUE(vector_equal(def_levels_, dresult));
+    }
+    if (max_rep_level_ > 0) {
+      ASSERT_TRUE(vector_equal(rep_levels_, rresult));
+    }
+    // catch improper writes at EOS
+    batch_actual = reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read);
+    ASSERT_EQ(0, batch_actual);
+    ASSERT_EQ(0, values_read);
+  }
+
+  void execute(int num_pages, int levels_page, const ColumnDescriptor *d) {
+    MakePages(d, num_pages, levels_page);
+    InitReader(d);
+    CheckResults();
   }
 
  protected:
-  std::shared_ptr<ColumnReader> reader_;
-  std::unique_ptr<PageReader> pager_;
+  int num_levels_;
+  int num_values_;
+  int16_t max_def_level_;
+  int16_t max_rep_level_;
   vector<shared_ptr<Page> > pages_;
+  std::shared_ptr<ColumnReader> reader_;
+  vector<int32_t> values_;
+  vector<int16_t> def_levels_;
+  vector<int16_t> rep_levels_;
 };
 
-
 TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
-  vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
-
-  std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, {}, 0,
-    {}, 0);
-  pages_.push_back(page);
-
+  int levels_per_page = 100;
+  int num_pages = 50;
+  max_def_level_ = 0;
+  max_rep_level_ = 0;
   NodePtr type = schema::Int32("a", Repetition::REQUIRED);
-  ColumnDescriptor descr(type, 0, 0);
-  InitReader(&descr);
-
-  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
-
-  vector<int32_t> result(10, -1);
-
-  size_t values_read = 0;
-  size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr,
-      &result[0], &values_read);
-  ASSERT_EQ(10, batch_actual);
-  ASSERT_EQ(10, values_read);
-
-  ASSERT_TRUE(vector_equal(result, values));
+  const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+  execute(num_pages, levels_per_page, &descr);
 }
 
-
 TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
-  vector<int32_t> values = {1, 2, 3, 4, 5};
-  vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};
-
-  std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, def_levels,
1,
-    {}, 0);
-
-  pages_.push_back(page);
-
-  NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
-  ColumnDescriptor descr(type, 1, 0);
-  InitReader(&descr);
-
-  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
-
-  size_t values_read = 0;
-  size_t batch_actual = 0;
-
-  vector<int32_t> vresult(3, -1);
-  vector<int16_t> dresult(5, -1);
-
-  batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
-      &vresult[0], &values_read);
-  ASSERT_EQ(5, batch_actual);
-  ASSERT_EQ(3, values_read);
-
-  ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
-  ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));
-
-  batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
-      &vresult[0], &values_read);
-  ASSERT_EQ(5, batch_actual);
-  ASSERT_EQ(2, values_read);
-
-  ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
-  ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));
-
-  // EOS, pass all nullptrs to check for improper writes. Do not segfault /
-  // core dump
-  batch_actual = reader->ReadBatch(5, nullptr, nullptr,
-      nullptr, &values_read);
-  ASSERT_EQ(0, batch_actual);
-  ASSERT_EQ(0, values_read);
+  int levels_per_page = 100;
+  int num_pages = 50;
+  max_def_level_ = 4;
+  max_rep_level_ = 0;
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+  execute(num_pages, levels_per_page, &descr);
 }
 
 TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
-  vector<int32_t> values = {1, 2, 3, 4, 5};
-  vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
-  vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};
-
-  std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values,
-      def_levels, 2, rep_levels, 1);
-
-  pages_.push_back(page);
-
-  NodePtr type = schema::Int32("a", Repetition::REPEATED);
-  ColumnDescriptor descr(type, 2, 1);
-  InitReader(&descr);
-
-  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
-
-  size_t values_read = 0;
-  size_t batch_actual = 0;
-
-  vector<int32_t> vresult(3, -1);
-  vector<int16_t> dresult(5, -1);
-  vector<int16_t> rresult(5, -1);
-
-  batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
-      &vresult[0], &values_read);
-  ASSERT_EQ(5, batch_actual);
-  ASSERT_EQ(3, values_read);
-
-  ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
-  ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));
-  ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 0, 5)));
-
-  batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
-      &vresult[0], &values_read);
-  ASSERT_EQ(5, batch_actual);
-  ASSERT_EQ(2, values_read);
-
-  ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
-  ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));
-  ASSERT_TRUE(vector_equal(rresult, slice(rep_levels, 5, 10)));
-
-  // EOS, pass all nullptrs to check for improper writes. Do not segfault /
-  // core dump
-  batch_actual = reader->ReadBatch(5, nullptr, nullptr,
-      nullptr, &values_read);
-  ASSERT_EQ(0, batch_actual);
-  ASSERT_EQ(0, values_read);
+  int levels_per_page = 100;
+  int num_pages = 50;
+  max_def_level_ = 4;
+  max_rep_level_ = 2;
+  NodePtr type = schema::Int32("c", Repetition::REPEATED);
+  const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+  execute(num_pages, levels_per_page, &descr);
 }
 
-TEST_F(TestPrimitiveReader, TestInt32FlatRepeatedMultiplePages) {
-  vector<int32_t> values[2] = {{1, 2, 3, 4, 5},
-    {6, 7, 8, 9, 10}};
-  vector<int16_t> def_levels[2] = {{2, 1, 1, 2, 2, 1, 1, 2, 2, 1},
-    {2, 2, 1, 2, 1, 1, 2, 1, 2, 1}};
-  vector<int16_t> rep_levels[2] = {{0, 1, 1, 0, 0, 1, 1, 0, 0, 1},
-    {0, 0, 1, 0, 1, 1, 0, 1, 0, 1}};
-
-  std::shared_ptr<DataPage> page;
-
-  for (int i = 0; i < 4; i++) {
-    page = MakeDataPage<Type::INT32>(values[i % 2],
-        def_levels[i % 2], 2, rep_levels[i % 2], 1);
-    pages_.push_back(page);
-  }
-
-  NodePtr type = schema::Int32("a", Repetition::REPEATED);
-  ColumnDescriptor descr(type, 2, 1);
-  InitReader(&descr);
-
-  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
-
-  size_t values_read = 0;
-  size_t batch_actual = 0;
-
-  vector<int32_t> vresult(3, -1);
-  vector<int16_t> dresult(5, -1);
-  vector<int16_t> rresult(5, -1);
-
-  for (int i = 0; i < 4; i++) {
-    batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
-        &vresult[0], &values_read);
-    ASSERT_EQ(5, batch_actual);
-    ASSERT_EQ(3, values_read);
-
-    ASSERT_TRUE(vector_equal(vresult, slice(values[i % 2], 0, 3)));
-    ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 0, 5)));
-    ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 0, 5)));
-
-    batch_actual = reader->ReadBatch(5, &dresult[0], &rresult[0],
-        &vresult[0], &values_read);
-    ASSERT_EQ(5, batch_actual);
-    ASSERT_EQ(2, values_read);
-
-    ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values[i % 2], 3, 5)));
-    ASSERT_TRUE(vector_equal(dresult, slice(def_levels[i % 2], 5, 10)));
-    ASSERT_TRUE(vector_equal(rresult, slice(rep_levels[i % 2], 5, 10)));
-  }
-  // EOS, pass all nullptrs to check for improper writes. Do not segfault /
-  // core dump
-  batch_actual = reader->ReadBatch(5, nullptr, nullptr,
-      nullptr, &values_read);
-  ASSERT_EQ(0, batch_actual);
-  ASSERT_EQ(0, values_read);
-}
 } // namespace test
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index d11a13c..dc23dd9 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -122,7 +122,7 @@ class TypedColumnReader : public ColumnReader {
   // This API is the same for both V1 and V2 of the DataPage
   //
   // @returns: actual number of levels read (see values_read for number of values read)
-  size_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels,
+  size_t ReadBatch(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
       T* values, size_t* values_read);
 
  private:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/column/scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc
new file mode 100644
index 0000000..be6b42e
--- /dev/null
+++ b/src/parquet/column/scanner-test.cc
@@ -0,0 +1,268 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "parquet/types.h"
+#include "parquet/column/page.h"
+#include "parquet/column/scanner.h"
+#include "parquet/column/test-util.h"
+#include "parquet/schema/descriptor.h"
+#include "parquet/schema/types.h"
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+using std::shared_ptr;
+
+namespace parquet_cpp {
+
+using schema::NodePtr;
+
+bool operator==(const Int96& a, const Int96& b) {
+  return a.value[0] == b.value[0] &&
+    a.value[1] == b.value[1] &&
+    a.value[2] == b.value[2];
+}
+
+bool operator==(const ByteArray& a, const ByteArray& b) {
+  return a.len == b.len && 0 == memcmp(a.ptr, b.ptr, a.len);
+}
+
+static int FLBA_LENGTH = 12;
+bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
+  return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
+}
+
+namespace test {
+
+template <int N> class TypeValue {
+ public:
+  static const int value = N;
+};
+template <int N> const int TypeValue<N>::value;
+
+template <typename TYPE>
+class TestFlatScanner : public ::testing::Test {
+ public:
+  typedef typename type_traits<TYPE::value>::value_type T;
+
+  void InitValues() {
+    random_numbers(num_values_, 0, std::numeric_limits<T>::min(),
+        std::numeric_limits<T>::max(), values_.data());
+  }
+
+  void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) {
+    num_levels_ = levels_per_page * num_pages;
+    num_values_ = 0;
+    uint32_t seed = 0;
+    int16_t zero = 0;
+    int16_t max_def_level = d->max_definition_level();
+    int16_t max_rep_level = d->max_repetition_level();
+    vector<int> values_per_page(num_pages, levels_per_page);
+    // Create definition levels
+    if (max_def_level > 0) {
+      def_levels_.resize(num_levels_);
+      random_numbers(num_levels_, seed, zero, max_def_level, def_levels_.data());
+      for (int p = 0; p < num_pages; p++) {
+        int num_values_per_page = 0;
+        for (int i = 0; i < levels_per_page; i++) {
+          if (def_levels_[i + p * levels_per_page] == max_def_level) {
+            num_values_per_page++;
+            num_values_++;
+          }
+        }
+        values_per_page[p] = num_values_per_page;
+      }
+    } else {
+      num_values_ = num_levels_;
+    }
+    // Create repitition levels
+    if (max_rep_level > 0) {
+      rep_levels_.resize(num_levels_);
+      random_numbers(num_levels_, seed, zero, max_rep_level, rep_levels_.data());
+    }
+    // Create values
+    values_.resize(num_values_);
+    InitValues();
+    Paginate<TYPE::value>(d, values_, def_levels_, max_def_level,
+        rep_levels_, max_rep_level, levels_per_page, values_per_page, pages_);
+  }
+
+  void InitScanner(const ColumnDescriptor *d) {
+    std::unique_ptr<PageReader> pager(new test::MockPageReader(pages_));
+    scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager)));
+  }
+
+  void CheckResults(int batch_size, const ColumnDescriptor *d) {
+    TypedScanner<TYPE::value>* scanner =
+      reinterpret_cast<TypedScanner<TYPE::value>* >(scanner_.get());
+    T val;
+    bool is_null;
+    int16_t def_level;
+    int16_t rep_level;
+    size_t j = 0;
+    scanner->SetBatchSize(batch_size);
+    for (size_t i = 0; i < num_levels_; i++) {
+      ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null))
<< i << j;
+      if (!is_null) {
+        ASSERT_EQ(values_[j++], val) << i <<"V"<< j;
+      }
+      if (!d->is_required()) {
+        ASSERT_EQ(def_levels_[i], def_level) << i <<"D"<< j;
+      }
+      if (d->is_repeated()) {
+        ASSERT_EQ(rep_levels_[i], rep_level) << i <<"R"<< j;
+      }
+    }
+    ASSERT_EQ(num_values_, j);
+    ASSERT_FALSE(scanner->HasNext());
+  }
+
+  void Clear() {
+    pages_.clear();
+    values_.clear();
+    def_levels_.clear();
+    rep_levels_.clear();
+  }
+
+  void Execute(int num_pages, int levels_page, int batch_size,
+      const ColumnDescriptor *d) {
+    MakePages(d, num_pages, levels_page);
+    InitScanner(d);
+    CheckResults(batch_size, d);
+    Clear();
+  }
+
+  void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1,
+      std::shared_ptr<ColumnDescriptor>& d2, std::shared_ptr<ColumnDescriptor>&
d3) {
+    NodePtr type;
+    type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
+        static_cast<Type::type>(TYPE::value));
+    d1.reset(new ColumnDescriptor(type, 0, 0));
+    type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL,
+        static_cast<Type::type>(TYPE::value));
+    d2.reset(new ColumnDescriptor(type, 4, 0));
+    type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED,
+        static_cast<Type::type>(TYPE::value));
+    d3.reset(new ColumnDescriptor(type, 4, 2));
+  }
+
+  void ExecuteAll(int num_pages, int num_levels, int batch_size) {
+    std::shared_ptr<ColumnDescriptor> d1;
+    std::shared_ptr<ColumnDescriptor> d2;
+    std::shared_ptr<ColumnDescriptor> d3;
+    InitDescriptors(d1, d2, d3);
+    // evaluate REQUIRED pages
+    Execute(num_pages, num_levels, batch_size, d1.get());
+    // evaluate OPTIONAL pages
+    Execute(num_pages, num_levels, batch_size, d2.get());
+    // evaluate REPEATED pages
+    Execute(num_pages, num_levels, batch_size, d3.get());
+  }
+
+ protected:
+  int num_levels_;
+  int num_values_;
+  vector<shared_ptr<Page> > pages_;
+  std::shared_ptr<Scanner> scanner_;
+  vector<T> values_;
+  vector<int16_t> def_levels_;
+  vector<int16_t> rep_levels_;
+  vector<uint8_t> data_buffer_; // For BA and FLBA
+};
+
+template<>
+void TestFlatScanner<TypeValue<Type::BOOLEAN> >::InitValues() {
+  values_ = flip_coins(num_values_, 0);
+}
+
+template<>
+void TestFlatScanner<TypeValue<Type::INT96> >::InitValues() {
+  random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(),
+      std::numeric_limits<int32_t>::max(), values_.data());
+}
+
+template<>
+void TestFlatScanner<TypeValue<Type::BYTE_ARRAY> >::InitValues() {
+  int max_byte_array_len = 12;
+  int num_bytes = max_byte_array_len + sizeof(uint32_t);
+  size_t nbytes = num_values_ * num_bytes;
+  data_buffer_.resize(nbytes);
+  random_byte_array(num_values_, 0, data_buffer_.data(), values_.data(),
+      max_byte_array_len);
+}
+
+template<>
+void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitValues() {
+  size_t nbytes = num_values_ * FLBA_LENGTH;
+  data_buffer_.resize(nbytes);
+  random_fixed_byte_array(num_values_, 0, data_buffer_.data(), FLBA_LENGTH,
+      values_.data());
+}
+
+template<>
+void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitDescriptors(
+    std::shared_ptr<ColumnDescriptor>& d1, std::shared_ptr<ColumnDescriptor>&
d2,
+    std::shared_ptr<ColumnDescriptor>& d3) {
+  NodePtr type = schema::PrimitiveNode::MakeFLBA("c1", Repetition::REQUIRED,
+      FLBA_LENGTH, LogicalType::UTF8);
+  d1.reset(new ColumnDescriptor(type, 0, 0));
+  type = schema::PrimitiveNode::MakeFLBA("c2", Repetition::OPTIONAL,
+      FLBA_LENGTH, LogicalType::UTF8);
+  d2.reset(new ColumnDescriptor(type, 4, 0));
+  type = schema::PrimitiveNode::MakeFLBA("c3", Repetition::REPEATED,
+      FLBA_LENGTH, LogicalType::UTF8);
+  d3.reset(new ColumnDescriptor(type, 4, 2));
+}
+
+typedef TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY>> TestFlatFLBAScanner;
+
+static int num_levels_per_page = 100;
+static int num_pages = 20;
+static int batch_size = 32;
+
+typedef ::testing::Types<TypeValue<Type::BOOLEAN>, TypeValue<Type::INT32>,
+    TypeValue<Type::INT64>, TypeValue<Type::INT96>, TypeValue<Type::FLOAT>,
+    TypeValue<Type::DOUBLE>, TypeValue<Type::BYTE_ARRAY>,
+    TypeValue<Type::FIXED_LEN_BYTE_ARRAY> > Primitives;
+
+TYPED_TEST_CASE(TestFlatScanner, Primitives);
+
+TYPED_TEST(TestFlatScanner, TestScanner) {
+  this->ExecuteAll(num_pages, num_levels_per_page, batch_size);
+}
+
+//PARQUET 502
+TEST_F(TestFlatFLBAScanner, TestSmallBatch) {
+  NodePtr type = schema::PrimitiveNode::MakeFLBA("c1", Repetition::REQUIRED,
+      FLBA_LENGTH, LogicalType::UTF8);
+  const ColumnDescriptor d(type, 0, 0);
+  MakePages(&d, 1, 100);
+  InitScanner(&d);
+  CheckResults(1, &d);
+}
+
+} // namespace test
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
index 512f540..f3f5719 100644
--- a/src/parquet/column/scanner.h
+++ b/src/parquet/column/scanner.h
@@ -45,8 +45,8 @@ class Scanner {
       values_buffered_(0),
       reader_(reader) {
     // TODO: don't allocate for required fields
-    def_levels_.resize(batch_size_);
-    rep_levels_.resize(batch_size_);
+    def_levels_.resize(reader->descr()->is_optional() ? batch_size_ : 0);
+    rep_levels_.resize(reader->descr()->is_repeated() ? batch_size_ : 0);
   }
 
   virtual ~Scanner() {}
@@ -57,7 +57,7 @@ class Scanner {
   virtual void PrintNext(std::ostream& out, int width) = 0;
 
   bool HasNext() {
-    return value_offset_ < values_buffered_ || reader_->HasNext();
+    return level_offset_ < levels_buffered_ || reader_->HasNext();
   }
 
   const ColumnDescriptor* descr() const {
@@ -108,21 +108,45 @@ class TypedScanner : public Scanner {
       levels_buffered_ = typed_reader_->ReadBatch(batch_size_, &def_levels_[0],
           &rep_levels_[0], values_, &values_buffered_);
 
-      // TODO: repetition levels
-
+      value_offset_ = 0;
       level_offset_ = 0;
       if (!levels_buffered_) {
         return false;
       }
     }
-    *def_level = def_levels_[level_offset_++];
-    *rep_level = 1;
+    *def_level = descr()->is_optional() ?
+      def_levels_[level_offset_] : descr()->max_definition_level();
+    *rep_level = descr()->is_repeated() ?
+      rep_levels_[level_offset_] : descr()->max_repetition_level();
+    level_offset_++;
+    return true;
+  }
+
+  bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
+     if (level_offset_ == levels_buffered_) {
+      if (!HasNext()) {
+        // Out of data pages
+        return false;
+      }
+    }
+
+    NextLevels(def_level, rep_level);
+    *is_null = *def_level < descr()->max_definition_level();
+
+    if (*is_null) {
+      return true;
+    }
+
+    if (value_offset_ == values_buffered_) {
+      throw ParquetException("Value was non-null, but has not been buffered");
+    }
+    *val = values_[value_offset_++];
     return true;
   }
 
   // Returns true if there is a next value
   bool NextValue(T* val, bool* is_null) {
-    if (value_offset_ == values_buffered_) {
+    if (level_offset_ == levels_buffered_) {
       if (!HasNext()) {
         // Out of data pages
         return false;
@@ -133,7 +157,7 @@ class TypedScanner : public Scanner {
     int16_t def_level;
     int16_t rep_level;
     NextLevels(&def_level, &rep_level);
-    *is_null = def_level < rep_level;
+    *is_null = def_level < descr()->max_definition_level();
 
     if (*is_null) {
       return true;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 99f56b1..1854ebb 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -33,6 +33,7 @@
 // Depended on by SerializedPageReader test utilities for now
 #include "parquet/encodings/plain-encoding.h"
 #include "parquet/util/input.h"
+#include "parquet/util/test-common.h"
 
 namespace parquet_cpp {
 
@@ -96,14 +97,14 @@ class DataPageBuilder {
     have_rep_levels_ = true;
   }
 
-  void AppendValues(const std::vector<T>& values,
+  void AppendValues(const ColumnDescriptor *d, const std::vector<T>& values,
       Encoding::type encoding = Encoding::PLAIN) {
     if (encoding != Encoding::PLAIN) {
       ParquetException::NYI("only plain encoding currently implemented");
     }
     size_t bytes_to_encode = values.size() * sizeof(T);
 
-    PlainEncoder<TYPE> encoder(nullptr);
+    PlainEncoder<TYPE> encoder(d);
     encoder.Encode(&values[0], values.size(), sink_);
 
     num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
@@ -164,8 +165,25 @@ class DataPageBuilder {
   }
 };
 
+template<>
+void DataPageBuilder<Type::BOOLEAN>::AppendValues(const ColumnDescriptor *d,
+    const std::vector<bool>& values, Encoding::type encoding) {
+  if (encoding != Encoding::PLAIN) {
+    ParquetException::NYI("only plain encoding currently implemented");
+  }
+  size_t bytes_to_encode = values.size() * sizeof(bool);
+
+  PlainEncoder<Type::BOOLEAN> encoder(d);
+  encoder.Encode(values, values.size(), sink_);
+
+  num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
+  encoding_ = encoding;
+  have_values_ = true;
+}
+
 template <int TYPE, typename T>
-static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
+static std::shared_ptr<DataPage> MakeDataPage(const ColumnDescriptor *d,
+    const std::vector<T>& values,
     const std::vector<int16_t>& def_levels, int16_t max_def_level,
     const std::vector<int16_t>& rep_levels, int16_t max_rep_level) {
   size_t num_values = values.size();
@@ -181,7 +199,7 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>&
values,
     page_builder.AppendDefLevels(def_levels, max_def_level);
   }
 
-  page_builder.AppendValues(values);
+  page_builder.AppendValues(d, values);
 
   auto buffer = page_stream.GetBuffer();
 
@@ -191,6 +209,37 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>&
values,
       page_builder.rep_level_encoding());
 }
 
+template <int TYPE, typename T>
+static void Paginate(const ColumnDescriptor *d,
+    const std::vector<T>& values,
+    const std::vector<int16_t>& def_levels, int16_t max_def_level,
+    const std::vector<int16_t>& rep_levels, int16_t max_rep_level,
+    int num_levels_per_page, const std::vector<int>& values_per_page,
+    std::vector<std::shared_ptr<Page> >& pages) {
+  int num_pages = values_per_page.size();
+  int def_level_start = 0;
+  int def_level_end = 0;
+  int rep_level_start = 0;
+  int rep_level_end = 0;
+  int value_start = 0;
+  for (int i = 0; i < num_pages; i++) {
+    if (max_def_level > 0) {
+      def_level_start = i * num_levels_per_page;
+      def_level_end = (i + 1) * num_levels_per_page;
+    }
+    if (max_rep_level > 0) {
+      rep_level_start = i * num_levels_per_page;
+      rep_level_end = (i + 1) * num_levels_per_page;
+    }
+    std::shared_ptr<DataPage> page = MakeDataPage<TYPE>(d,
+        slice(values, value_start, value_start + values_per_page[i]),
+        slice(def_levels, def_level_start, def_level_end), max_def_level,
+        slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
+    pages.push_back(page);
+    value_start += values_per_page[i];
+  }
+}
+
 } // namespace test
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/encodings/plain-encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc
index 04eb907..7ebd21f 100644
--- a/src/parquet/encodings/plain-encoding-test.cc
+++ b/src/parquet/encodings/plain-encoding-test.cc
@@ -81,7 +81,8 @@ class EncodeDecode{
 
   void generate_data() {
     // seed the prng so failure is deterministic
-    random_numbers(num_values_, 0, draws_);
+    random_numbers(num_values_, 0, std::numeric_limits<T>::min(),
+       std::numeric_limits<T>::max(), draws_);
   }
 
   void encode_decode(ColumnDescriptor *d) {
@@ -130,6 +131,13 @@ void EncodeDecode<bool, Type::BOOLEAN>::generate_data() {
 }
 
 template<>
+void EncodeDecode<Int96, Type::INT96>::generate_data() {
+  // seed the prng so failure is deterministic
+    random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(),
+       std::numeric_limits<int32_t>::max(), draws_);
+}
+
+template<>
 void EncodeDecode<Int96, Type::INT96>::verify_results() {
   for (size_t i = 0; i < num_values_; ++i) {
     ASSERT_EQ(draws_[i].value[0], decode_buf_[i].value[0]) << i;
@@ -141,8 +149,9 @@ void EncodeDecode<Int96, Type::INT96>::verify_results() {
 template<>
 void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::generate_data() {
   // seed the prng so failure is deterministic
-  int max_byte_array_len = 12 + sizeof(uint32_t);
-  size_t nbytes = num_values_ * max_byte_array_len;
+  int max_byte_array_len = 12;
+  int num_bytes = max_byte_array_len + sizeof(uint32_t);
+  size_t nbytes = num_values_ * num_bytes;
   data_buffer_.resize(nbytes);
   random_byte_array(num_values_, 0, data_buffer_.data(), draws_,
       max_byte_array_len);
@@ -168,7 +177,7 @@ void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::generate_data()
{
 
 template<>
 void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::verify_results() {
-  for (size_t i = 0; i < 1000; ++i) {
+  for (size_t i = 0; i < num_values_; ++i) {
     ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, flba_length)) << i;
   }
 }
@@ -213,7 +222,7 @@ TEST(BAEncodeDecode, TestEncodeDecode) {
 TEST(FLBAEncodeDecode, TestEncodeDecode) {
   schema::NodePtr node;
   node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL,
-      Type::FIXED_LEN_BYTE_ARRAY, flba_length, LogicalType::UTF8);
+      flba_length, LogicalType::UTF8);
   ColumnDescriptor d(node, 0, 0);
   EncodeDecode<FixedLenByteArray, Type::FIXED_LEN_BYTE_ARRAY> obj;
   obj.execute(num_values, &d);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/schema/descriptor.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema/descriptor.h b/src/parquet/schema/descriptor.h
index 7991dea..62066ef 100644
--- a/src/parquet/schema/descriptor.h
+++ b/src/parquet/schema/descriptor.h
@@ -58,6 +58,18 @@ class ColumnDescriptor {
     return primitive_node_->name();
   }
 
+  bool is_required() const {
+    return max_definition_level_ == 0;
+  }
+
+  bool is_optional() const {
+    return max_definition_level_ > 0;
+  }
+
+  bool is_repeated() const {
+    return max_repetition_level_ > 0;
+  }
+
   int type_length() const;
 
  private:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/schema/schema-descriptor-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc
index 83b136d..7677615 100644
--- a/src/parquet/schema/schema-descriptor-test.cc
+++ b/src/parquet/schema/schema-descriptor-test.cc
@@ -51,7 +51,7 @@ TEST(TestColumnDescriptor, TestAttrs) {
 
   // Test FIXED_LEN_BYTE_ARRAY
   node = PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL,
-      Type::FIXED_LEN_BYTE_ARRAY, 12, LogicalType::UTF8);
+      12, LogicalType::UTF8);
   descr = ColumnDescriptor(node, 4, 1);
 
   ASSERT_EQ(Type::FIXED_LEN_BYTE_ARRAY, descr.physical_type());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/schema/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema/types.h b/src/parquet/schema/types.h
index 83b9fd2..e76323f 100644
--- a/src/parquet/schema/types.h
+++ b/src/parquet/schema/types.h
@@ -183,10 +183,9 @@ class PrimitiveNode : public Node {
 
   // Alternate constructor for FIXED_LEN_BYTE_ARRAY (FLBA)
   static inline NodePtr MakeFLBA(const std::string& name,
-      Repetition::type repetition, Type::type type,
-      int32_t type_length,
+      Repetition::type repetition, int32_t type_length,
       LogicalType::type logical_type = LogicalType::NONE) {
-    NodePtr result = Make(name, repetition, type, logical_type);
+    NodePtr result = Make(name, repetition, Type::FIXED_LEN_BYTE_ARRAY, logical_type);
     static_cast<PrimitiveNode*>(result.get())->SetTypeLength(type_length);
     return result;
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1df5a26d/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
index d8961d1..49c4131 100644
--- a/src/parquet/util/test-common.h
+++ b/src/parquet/util/test-common.h
@@ -106,49 +106,46 @@ void random_bytes(int n, uint32_t seed, std::vector<uint8_t>*
out) {
   }
 }
 
-template <typename T>
-void random_numbers(int n, uint32_t seed, T* out) {
+void random_bools(int n, double p, uint32_t seed, bool* out) {
   std::mt19937 gen(seed);
-    std::uniform_real_distribution<T> d(std::numeric_limits<T>::lowest(),
-        std::numeric_limits<T>::max());
+  std::bernoulli_distribution d(p);
   for (int i = 0; i < n; ++i) {
     out[i] = d(gen);
   }
 }
 
-void random_bools(int n, double p, uint32_t seed, bool* out) {
+template <typename T>
+void random_numbers(int n, uint32_t seed, T min_value, T max_value, T* out) {
   std::mt19937 gen(seed);
-  std::bernoulli_distribution d(p);
+  std::uniform_int_distribution<T> d(min_value, max_value);
   for (int i = 0; i < n; ++i) {
     out[i] = d(gen);
   }
 }
 
 template <>
-void random_numbers(int n, uint32_t seed, int32_t* out) {
+void random_numbers(int n, uint32_t seed, float min_value, float max_value, float* out) {
   std::mt19937 gen(seed);
-  std::uniform_int_distribution<int32_t> d(std::numeric_limits<int32_t>::lowest(),
-      std::numeric_limits<int32_t>::max());
+  std::uniform_real_distribution<float> d(min_value, max_value);
   for (int i = 0; i < n; ++i) {
     out[i] = d(gen);
   }
 }
 
 template <>
-void random_numbers(int n, uint32_t seed, int64_t* out) {
+void random_numbers(int n, uint32_t seed, double min_value, double max_value,
+    double* out) {
   std::mt19937 gen(seed);
-  std::uniform_int_distribution<int64_t> d(std::numeric_limits<int64_t>::lowest(),
-      std::numeric_limits<int64_t>::max());
+  std::uniform_real_distribution<double> d(min_value, max_value);
   for (int i = 0; i < n; ++i) {
     out[i] = d(gen);
   }
 }
 
-template <>
-void random_numbers(int n, uint32_t seed, Int96* out) {
+void random_Int96_numbers(int n, uint32_t seed, int32_t min_value, int32_t max_value,
+    Int96* out) {
   std::mt19937 gen(seed);
-  std::uniform_int_distribution<uint32_t> d(std::numeric_limits<uint32_t>::lowest(),
-      std::numeric_limits<uint32_t>::max());
+  std::uniform_int_distribution<int32_t> d(min_value, max_value);
   for (int i = 0; i < n; ++i) {
     out[i].value[0] = d(gen);
     out[i].value[1] = d(gen);
@@ -183,6 +180,7 @@ void random_byte_array(int n, uint32_t seed, uint8_t *buf,
     buf += out[i].len;
   }
 }
+
 } // namespace test
 } // namespace parquet_cpp
 


Mime
View raw message