parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-549: Add column reader tests for dictionary pages
Date Thu, 03 Mar 2016 05:44:56 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 5a624c069 -> 70f5088f7


PARQUET-549: Add column reader tests for dictionary pages

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #71 from majetideepak/PARQUET-549 and squashes the following commits:

134cba3 [Deepak Majeti] fixed clang compilation error
18fd2c9 [Deepak Majeti] addressed comments
ee47cef [Deepak Majeti] split indices onto multiple pages
06bc0af [Deepak Majeti] resolve clang error
09674f6 [Deepak Majeti] comment edits
10ed327 [Deepak Majeti] re-structured MakePages
e00eafe [Deepak Majeti] added scanner tests
832706f [Deepak Majeti] column Reader Test passes
4280ae7 [Deepak Majeti] rebased
ae65369 [Deepak Majeti] Test for Column Reader Plain Dictionary Pages
6f9f451 [Deepak Majeti] re-structured code for better reuse


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/70f5088f
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/70f5088f
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/70f5088f

Branch: refs/heads/master
Commit: 70f5088f75a2aa18d2d57114e7a7f6da342a99d1
Parents: 5a624c0
Author: Deepak Majeti <deepak.majeti@hpe.com>
Authored: Wed Mar 2 21:44:42 2016 -0800
Committer: Wes McKinney <wesm@apache.org>
Committed: Wed Mar 2 21:44:42 2016 -0800

----------------------------------------------------------------------
 src/parquet/column/column-reader-test.cc |  65 +++---
 src/parquet/column/scanner-test.cc       | 165 +++++++--------
 src/parquet/column/test-util.h           | 294 ++++++++++++++++++++++----
 3 files changed, 361 insertions(+), 163 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/70f5088f/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index a5b918f..855669a 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -45,42 +45,6 @@ namespace test {
 
 class TestPrimitiveReader : public ::testing::Test {
  public:
-  void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) {
-    num_levels_ = levels_per_page * num_pages;
-    num_values_ = 0;
-    uint32_t seed = 0;
-    int16_t zero = 0;
-    vector<int> values_per_page(num_pages, levels_per_page);
-    // Create definition levels
-    if (max_def_level_ > 0) {
-      def_levels_.resize(num_levels_);
-      random_numbers(num_levels_, seed, zero, max_def_level_, def_levels_.data());
-      for (int p = 0; p < num_pages; p++) {
-        int num_values_per_page = 0;
-        for (int i = 0; i < levels_per_page; i++) {
-          if (def_levels_[i + p * levels_per_page] == max_def_level_) {
-            num_values_per_page++;
-            num_values_++;
-          }
-        }
-        values_per_page[p] = num_values_per_page;
-      }
-    } else {
-      num_values_ = num_levels_;
-    }
-    // Create repitition levels
-    if (max_rep_level_ > 0) {
-      rep_levels_.resize(num_levels_);
-      random_numbers(num_levels_, seed, zero, max_rep_level_, rep_levels_.data());
-    }
-    // Create values
-    values_.resize(num_values_);
-    random_numbers(num_values_, seed, std::numeric_limits<int32_t>::min(),
-       std::numeric_limits<int32_t>::max(), values_.data());
-    Paginate<Type::INT32, int32_t>(d, values_, def_levels_, max_def_level_,
-        rep_levels_, max_rep_level_, levels_per_page, values_per_page, pages_);
-  }
-
   void InitReader(const ColumnDescriptor* d) {
     std::unique_ptr<PageReader> pager_;
     pager_.reset(new test::MockPageReader(pages_));
@@ -124,8 +88,23 @@ class TestPrimitiveReader : public ::testing::Test {
     ASSERT_EQ(0, values_read);
   }
 
-  void execute(int num_pages, int levels_page, const ColumnDescriptor *d) {
-    MakePages(d, num_pages, levels_page);
+  void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor *d) {
+    num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+        rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
+    num_levels_ = num_pages * levels_per_page;
+    InitReader(d);
+    CheckResults();
+    values_.clear();
+    def_levels_.clear();
+    rep_levels_.clear();
+    pages_.clear();
+    reader_.reset();
+  }
+
+  void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor *d) {
+    num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+        rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
+    num_levels_ = num_pages * levels_per_page;
     InitReader(d);
     CheckResults();
   }
@@ -140,6 +119,7 @@ class TestPrimitiveReader : public ::testing::Test {
   vector<int32_t> values_;
   vector<int16_t> def_levels_;
   vector<int16_t> rep_levels_;
+  vector<uint8_t> data_buffer_; // For BA and FLBA
 };
 
 TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
@@ -149,7 +129,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
   max_rep_level_ = 0;
   NodePtr type = schema::Int32("a", Repetition::REQUIRED);
   const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
-  execute(num_pages, levels_per_page, &descr);
+  ExecutePlain(num_pages, levels_per_page, &descr);
+  ExecuteDict(num_pages, levels_per_page, &descr);
 }
 
 TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
@@ -159,7 +140,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
   max_rep_level_ = 0;
   NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
   const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
-  execute(num_pages, levels_per_page, &descr);
+  ExecutePlain(num_pages, levels_per_page, &descr);
+  ExecuteDict(num_pages, levels_per_page, &descr);
 }
 
 TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
@@ -169,7 +151,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
   max_rep_level_ = 2;
   NodePtr type = schema::Int32("c", Repetition::REPEATED);
   const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
-  execute(num_pages, levels_per_page, &descr);
+  ExecutePlain(num_pages, levels_per_page, &descr);
+  ExecuteDict(num_pages, levels_per_page, &descr);
 }
 
 } // namespace test

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/70f5088f/src/parquet/column/scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc
index 32c1ea5..b52a993 100644
--- a/src/parquet/column/scanner-test.cc
+++ b/src/parquet/column/scanner-test.cc
@@ -41,59 +41,58 @@ namespace parquet_cpp {
 using schema::NodePtr;
 
 static int FLBA_LENGTH = 12;
+
 bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
   return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
 }
 
 namespace test {
 
+template<>
+void InitValues<bool>(int num_values,  vector<bool>& values,
+     vector<uint8_t>& buffer) {
+  values = flip_coins(num_values, 0);
+}
+
+template<>
+void InitValues<Int96>(int num_values, vector<Int96>& values,
+    vector<uint8_t>& buffer) {
+  random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(),
+      std::numeric_limits<int32_t>::max(), values.data());
+}
+
+template<>
+void InitValues<ByteArray>(int num_values, vector<ByteArray>& values,
+    vector<uint8_t>& buffer) {
+  int max_byte_array_len = 12;
+  int num_bytes = max_byte_array_len + sizeof(uint32_t);
+  size_t nbytes = num_values * num_bytes;
+  buffer.resize(nbytes);
+  random_byte_array(num_values, 0, buffer.data(), values.data(),
+      max_byte_array_len);
+}
+
+template<>
+void InitValues<FLBA>(int num_values, vector<FLBA>& values,
+    vector<uint8_t>& buffer) {
+  size_t nbytes = num_values * FLBA_LENGTH;
+  buffer.resize(nbytes);
+  random_fixed_byte_array(num_values, 0, buffer.data(), FLBA_LENGTH,
+      values.data());
+}
+
+template<>
+void InitDictValues<bool>(int num_values, int dict_per_page,
+    vector<bool>& values, vector<uint8_t>& buffer) {
+  // No op for bool
+}
+
+
 template <typename Type>
 class TestFlatScanner : public ::testing::Test {
  public:
   typedef typename Type::c_type T;
 
-  void InitValues() {
-    random_numbers(num_values_, 0, std::numeric_limits<T>::min(),
-        std::numeric_limits<T>::max(), values_.data());
-  }
-
-  void MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page) {
-    num_levels_ = levels_per_page * num_pages;
-    num_values_ = 0;
-    uint32_t seed = 0;
-    int16_t zero = 0;
-    int16_t max_def_level = d->max_definition_level();
-    int16_t max_rep_level = d->max_repetition_level();
-    vector<int> values_per_page(num_pages, levels_per_page);
-    // Create definition levels
-    if (max_def_level > 0) {
-      def_levels_.resize(num_levels_);
-      random_numbers(num_levels_, seed, zero, max_def_level, def_levels_.data());
-      for (int p = 0; p < num_pages; p++) {
-        int num_values_per_page = 0;
-        for (int i = 0; i < levels_per_page; i++) {
-          if (def_levels_[i + p * levels_per_page] == max_def_level) {
-            num_values_per_page++;
-            num_values_++;
-          }
-        }
-        values_per_page[p] = num_values_per_page;
-      }
-    } else {
-      num_values_ = num_levels_;
-    }
-    // Create repitition levels
-    if (max_rep_level > 0) {
-      rep_levels_.resize(num_levels_);
-      random_numbers(num_levels_, seed, zero, max_rep_level, rep_levels_.data());
-    }
-    // Create values
-    values_.resize(num_values_);
-    InitValues();
-    Paginate<Type::type_num>(d, values_, def_levels_, max_def_level,
-        rep_levels_, max_rep_level, levels_per_page, values_per_page, pages_);
-  }
-
   void InitScanner(const ColumnDescriptor *d) {
     std::unique_ptr<PageReader> pager(new test::MockPageReader(pages_));
     scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager)));
@@ -111,7 +110,8 @@ class TestFlatScanner : public ::testing::Test {
     for (int i = 0; i < num_levels_; i++) {
       ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null))
<< i << j;
       if (!is_null) {
-        ASSERT_EQ(values_[j++], val) << i <<"V"<< j;
+        ASSERT_EQ(values_[j], val) << i <<"V"<< j;
+        j++;
       }
       if (d->max_definition_level() > 0) {
         ASSERT_EQ(def_levels_[i], def_level) << i <<"D"<< j;
@@ -131,9 +131,11 @@ class TestFlatScanner : public ::testing::Test {
     rep_levels_.clear();
   }
 
-  void Execute(int num_pages, int levels_page, int batch_size,
-      const ColumnDescriptor *d) {
-    MakePages(d, num_pages, levels_page);
+  void Execute(int num_pages, int levels_per_page, int batch_size,
+      const ColumnDescriptor *d, Encoding::type encoding) {
+    num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
+        values_, data_buffer_, pages_, encoding);
+    num_levels_ = num_pages * levels_per_page;
     InitScanner(d);
     CheckResults(batch_size, d);
     Clear();
@@ -154,17 +156,18 @@ class TestFlatScanner : public ::testing::Test {
     d3.reset(new ColumnDescriptor(type, 4, 2));
   }
 
-  void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length) {
+  void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length,
+      Encoding::type encoding = Encoding::PLAIN) {
     std::shared_ptr<ColumnDescriptor> d1;
     std::shared_ptr<ColumnDescriptor> d2;
     std::shared_ptr<ColumnDescriptor> d3;
     InitDescriptors(d1, d2, d3, type_length);
     // evaluate REQUIRED pages
-    Execute(num_pages, num_levels, batch_size, d1.get());
+    Execute(num_pages, num_levels, batch_size, d1.get(), encoding);
     // evaluate OPTIONAL pages
-    Execute(num_pages, num_levels, batch_size, d2.get());
+    Execute(num_pages, num_levels, batch_size, d2.get(), encoding);
     // evaluate REPEATED pages
-    Execute(num_pages, num_levels, batch_size, d3.get());
+    Execute(num_pages, num_levels, batch_size, d3.get(), encoding);
   }
 
  protected:
@@ -178,62 +181,50 @@ class TestFlatScanner : public ::testing::Test {
   vector<uint8_t> data_buffer_; // For BA and FLBA
 };
 
-template<>
-void TestFlatScanner<BooleanType>::InitValues() {
-  values_ = flip_coins(num_values_, 0);
-}
-
-template<>
-void TestFlatScanner<Int96Type>::InitValues() {
-  random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(),
-      std::numeric_limits<int32_t>::max(), values_.data());
-}
-
-template<>
-void TestFlatScanner<ByteArrayType>::InitValues() {
-  int max_byte_array_len = 12;
-  int num_bytes = max_byte_array_len + sizeof(uint32_t);
-  int nbytes = num_values_ * num_bytes;
-  data_buffer_.resize(nbytes);
-  random_byte_array(num_values_, 0, data_buffer_.data(), values_.data(),
-      max_byte_array_len);
-}
-
-template<>
-void TestFlatScanner<FLBAType>::InitValues() {
-  int nbytes = num_values_ * FLBA_LENGTH;
-  data_buffer_.resize(nbytes);
-  random_fixed_byte_array(num_values_, 0, data_buffer_.data(), FLBA_LENGTH,
-      values_.data());
-}
-
 typedef TestFlatScanner<FLBAType> TestFlatFLBAScanner;
 
 static int num_levels_per_page = 100;
 static int num_pages = 20;
 static int batch_size = 32;
 
-typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type,
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type,
                          FloatType, DoubleType, ByteArrayType> TestTypes;
 
+typedef TestFlatScanner<BooleanType> TestBooleanFlatScanner;
 typedef TestFlatScanner<FLBAType> TestFLBAFlatScanner;
 
 TYPED_TEST_CASE(TestFlatScanner, TestTypes);
 
-TYPED_TEST(TestFlatScanner, TestScanner) {
+TYPED_TEST(TestFlatScanner, TestPlainScanner) {
+  this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, Encoding::PLAIN);
+}
+
+TYPED_TEST(TestFlatScanner, TestDictScanner) {
+  this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0,
+      Encoding::RLE_DICTIONARY);
+}
+
+TEST_F(TestBooleanFlatScanner, TestPlainScanner) {
   this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0);
 }
 
-TEST_F(TestFLBAFlatScanner, TestScanner) {
+TEST_F(TestFLBAFlatScanner, TestPlainScanner) {
   this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH);
 }
 
+TEST_F(TestFLBAFlatScanner, TestDictScanner) {
+  this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH,
+      Encoding::RLE_DICTIONARY);
+}
+
 //PARQUET 502
 TEST_F(TestFlatFLBAScanner, TestSmallBatch) {
   NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
       Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
   const ColumnDescriptor d(type, 0, 0);
-  MakePages(&d, 1, 100);
+  num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
+      data_buffer_, pages_);
+  num_levels_ = 1 * 100;
   InitScanner(&d);
   CheckResults(1, &d);
 }
@@ -242,7 +233,9 @@ TEST_F(TestFlatFLBAScanner, TestDescriptorAPI) {
   NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
       Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
   const ColumnDescriptor d(type, 4, 0);
-  MakePages(&d, 1, 100);
+  num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
+      data_buffer_, pages_);
+  num_levels_ = 1 * 100;
   InitScanner(&d);
   TypedScanner<FLBAType::type_num>* scanner =
     reinterpret_cast<TypedScanner<FLBAType::type_num>* >(scanner_.get());
@@ -255,7 +248,9 @@ TEST_F(TestFlatFLBAScanner, TestFLBAPrinterNext) {
   NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
       Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
   const ColumnDescriptor d(type, 4, 0);
-  MakePages(&d, 1, 100);
+  num_values_ = MakePages<FLBAType>(&d, 1, 100, def_levels_, rep_levels_, values_,
+      data_buffer_, pages_);
+  num_levels_ = 1 * 100;
   InitScanner(&d);
   TypedScanner<FLBAType::type_num>* scanner =
     reinterpret_cast<TypedScanner<FLBAType::type_num>* >(scanner_.get());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/70f5088f/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 36e9860..c9b08c2 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -23,6 +23,7 @@
 #define PARQUET_COLUMN_TEST_UTIL_H
 
 #include <algorithm>
+#include <limits>
 #include <memory>
 #include <vector>
 #include <string>
@@ -32,40 +33,68 @@
 
 // Depended on by SerializedPageReader test utilities for now
 #include "parquet/encodings/plain-encoding.h"
+#include "parquet/encodings/dictionary-encoding.h"
 #include "parquet/util/input.h"
 #include "parquet/util/test-common.h"
 
+using std::vector;
+using std::shared_ptr;
+
 namespace parquet_cpp {
 
 namespace test {
 
+template <typename T>
+static void InitValues(int num_values, vector<T>& values,
+    vector<uint8_t>& buffer) {
+  random_numbers(num_values, 0, std::numeric_limits<T>::min(),
+      std::numeric_limits<T>::max(), values.data());
+}
+
+template <typename T>
+static void InitDictValues(int num_values, int num_dicts,
+    vector<T>& values, vector<uint8_t>& buffer) {
+  int repeat_factor = num_values / num_dicts;
+  InitValues<T>(num_dicts, values, buffer);
+  // add some repeated values
+  for (int j = 1; j < repeat_factor; ++j) {
+    for (int i = 0; i < num_dicts; ++i) {
+      std::memcpy(&values[num_dicts * j + i], &values[i], sizeof(T));
+    }
+  }
+  // computed only dict_per_page * repeat_factor - 1 values < num_values
+  // compute remaining
+  for (int i = num_dicts * repeat_factor; i < num_values; ++i) {
+    std::memcpy(&values[i], &values[i - num_dicts * repeat_factor], sizeof(T));
+  }
+}
+
 class MockPageReader : public PageReader {
  public:
-  explicit MockPageReader(const std::vector<std::shared_ptr<Page> >& pages)
:
+  explicit MockPageReader(const vector<shared_ptr<Page> >& pages) :
       pages_(pages),
       page_index_(0) {}
 
   // Implement the PageReader interface
-  virtual std::shared_ptr<Page> NextPage() {
+  virtual shared_ptr<Page> NextPage() {
     if (page_index_ == static_cast<int>(pages_.size())) {
       // EOS to consumer
-      return std::shared_ptr<Page>(nullptr);
+      return shared_ptr<Page>(nullptr);
     }
     return pages_[page_index_++];
   }
 
  private:
-  std::vector<std::shared_ptr<Page> > pages_;
+  vector<shared_ptr<Page> > pages_;
   int page_index_;
 };
 
 // TODO(wesm): this is only used for testing for now. Refactor to form part of
 // primary file write path
-
-template <int TYPE>
+template <typename Type>
 class DataPageBuilder {
  public:
-  typedef typename type_traits<TYPE>::value_type T;
+  typedef typename Type::c_type T;
 
   // This class writes data and metadata to the passed inputs
   explicit DataPageBuilder(InMemoryOutputStream* sink) :
@@ -79,7 +108,7 @@ class DataPageBuilder {
       have_values_(false) {
   }
 
-  void AppendDefLevels(const std::vector<int16_t>& levels, int16_t max_level,
+  void AppendDefLevels(const vector<int16_t>& levels, int16_t max_level,
       Encoding::type encoding = Encoding::RLE) {
     AppendLevels(levels, max_level, encoding);
 
@@ -88,7 +117,7 @@ class DataPageBuilder {
     have_def_levels_ = true;
   }
 
-  void AppendRepLevels(const std::vector<int16_t>& levels, int16_t max_level,
+  void AppendRepLevels(const vector<int16_t>& levels, int16_t max_level,
       Encoding::type encoding = Encoding::RLE) {
     AppendLevels(levels, max_level, encoding);
 
@@ -97,12 +126,9 @@ class DataPageBuilder {
     have_rep_levels_ = true;
   }
 
-  void AppendValues(const ColumnDescriptor *d, const std::vector<T>& values,
+  void AppendValues(const ColumnDescriptor *d, const vector<T>& values,
       Encoding::type encoding = Encoding::PLAIN) {
-    if (encoding != Encoding::PLAIN) {
-      ParquetException::NYI("only plain encoding currently implemented");
-    }
-    PlainEncoder<TYPE> encoder(d);
+    PlainEncoder<Type::type_num> encoder(d);
     encoder.Encode(&values[0], values.size(), sink_);
 
     num_values_ = std::max(static_cast<int32_t>(values.size()), num_values_);
@@ -139,14 +165,14 @@ class DataPageBuilder {
   bool have_values_;
 
   // Used internally for both repetition and definition levels
-  void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
+  void AppendLevels(const vector<int16_t>& levels, int16_t max_level,
       Encoding::type encoding) {
     if (encoding != Encoding::RLE) {
       ParquetException::NYI("only rle encoding currently implemented");
     }
 
     // TODO: compute a more precise maximum size for the encoded levels
-    std::vector<uint8_t> encode_buffer(levels.size() * 4);
+    vector<uint8_t> encode_buffer(levels.size() * 2);
 
     // We encode into separate memory from the output stream because the
     // RLE-encoded bytes have to be preceded in the stream by their absolute
@@ -164,8 +190,8 @@ class DataPageBuilder {
 };
 
 template<>
-void DataPageBuilder<Type::BOOLEAN>::AppendValues(const ColumnDescriptor *d,
-    const std::vector<bool>& values, Encoding::type encoding) {
+void DataPageBuilder<BooleanType>::AppendValues(const ColumnDescriptor *d,
+    const vector<bool>& values, Encoding::type encoding) {
   if (encoding != Encoding::PLAIN) {
     ParquetException::NYI("only plain encoding currently implemented");
   }
@@ -177,39 +203,180 @@ void DataPageBuilder<Type::BOOLEAN>::AppendValues(const ColumnDescriptor
*d,
   have_values_ = true;
 }
 
-template <int TYPE, typename T>
-static std::shared_ptr<DataPage> MakeDataPage(const ColumnDescriptor *d,
-    const std::vector<T>& values,
-    const std::vector<int16_t>& def_levels, int16_t max_def_level,
-    const std::vector<int16_t>& rep_levels, int16_t max_rep_level) {
+template <typename Type>
+static shared_ptr<DataPage> MakeDataPage(const ColumnDescriptor *d,
+    const vector<typename Type::c_type>& values, int num_vals,
+    Encoding::type encoding, const uint8_t* indices, int indices_size,
+    const vector<int16_t>& def_levels, int16_t max_def_level,
+    const vector<int16_t>& rep_levels, int16_t max_rep_level) {
+  int num_values = 0;
+
   InMemoryOutputStream page_stream;
-  test::DataPageBuilder<TYPE> page_builder(&page_stream);
+  test::DataPageBuilder<Type> page_builder(&page_stream);
 
   if (!rep_levels.empty()) {
     page_builder.AppendRepLevels(rep_levels, max_rep_level);
   }
-
   if (!def_levels.empty()) {
     page_builder.AppendDefLevels(def_levels, max_def_level);
   }
 
-  page_builder.AppendValues(d, values);
+  if (encoding == Encoding::PLAIN) {
+    page_builder.AppendValues(d, values, encoding);
+    num_values = page_builder.num_values();
+  } else {// DICTIONARY PAGES
+    page_stream.Write(indices, indices_size);
+    num_values = std::max(page_builder.num_values(), num_vals);
+  }
 
   auto buffer = page_stream.GetBuffer();
 
-  return std::make_shared<DataPage>(buffer, page_builder.num_values(),
-      page_builder.encoding(),
+  return std::make_shared<DataPage>(buffer, num_values,
+      encoding,
       page_builder.def_level_encoding(),
       page_builder.rep_level_encoding());
 }
 
-template <int TYPE, typename T>
-static void Paginate(const ColumnDescriptor *d,
-    const std::vector<T>& values,
-    const std::vector<int16_t>& def_levels, int16_t max_def_level,
-    const std::vector<int16_t>& rep_levels, int16_t max_rep_level,
-    int num_levels_per_page, const std::vector<int>& values_per_page,
-    std::vector<std::shared_ptr<Page> >& pages) {
+
+template <typename TYPE>
+class DictionaryPageBuilder {
+ public:
+  typedef typename TYPE::c_type TC;
+  static constexpr int TN = TYPE::type_num;
+
+  // This class writes data and metadata to the passed inputs
+  explicit DictionaryPageBuilder(const ColumnDescriptor *d) :
+      num_dict_values_(0),
+      have_values_(false) {
+        int type_length = 0;
+        if (TN == Type::FIXED_LEN_BYTE_ARRAY) {
+          type_length = d->type_length();
+        }
+        encoder_.reset(new DictEncoder<TC>(&pool_, type_length));
+  }
+
+  ~DictionaryPageBuilder() {
+    pool_.FreeAll();
+  }
+
+  shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
+    shared_ptr<OwnedMutableBuffer> rle_indices = std::make_shared<OwnedMutableBuffer>();
+    int num_values = values.size();
+    // Dictionary encoding
+    for (int i = 0; i < num_values; ++i) {
+      encoder_->Put(values[i]);
+    }
+    num_dict_values_ = encoder_->num_entries();
+    have_values_ = true;
+    rle_indices->Resize(sizeof(int) * encoder_->EstimatedDataEncodedSize());
+    int actual_bytes = encoder_->WriteIndices(rle_indices->mutable_data(),
+        rle_indices->size());
+    rle_indices->Resize(actual_bytes);
+    encoder_->ClearIndices();
+    return rle_indices;
+  }
+
+  shared_ptr<Buffer> WriteDict() {
+    shared_ptr<OwnedMutableBuffer> dict_buffer = std::make_shared<OwnedMutableBuffer>();
+    dict_buffer->Resize(encoder_->dict_encoded_size());
+    encoder_->WriteDict(dict_buffer->mutable_data());
+    return dict_buffer;
+  }
+
+  int32_t num_values() const {
+    return num_dict_values_;
+  }
+
+ private:
+  MemPool pool_;
+  shared_ptr<DictEncoder<TC> > encoder_;
+  int32_t num_dict_values_;
+  bool have_values_;
+};
+
+template<>
+DictionaryPageBuilder<BooleanType>::DictionaryPageBuilder(const ColumnDescriptor *d)
{
+  ParquetException::NYI("only plain encoding currently implemented for boolean");
+}
+
+template<>
+shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::WriteDict() {
+  ParquetException::NYI("only plain encoding currently implemented for boolean");
+  return nullptr;
+}
+
+template<>
+shared_ptr<Buffer> DictionaryPageBuilder<BooleanType>::AppendValues(
+    const vector<TC>& values) {
+  ParquetException::NYI("only plain encoding currently implemented for boolean");
+  return nullptr;
+}
+
+template <typename Type>
+static shared_ptr<DictionaryPage> MakeDictPage(const ColumnDescriptor *d,
+    const vector<typename Type::c_type>& values, const vector<int>& values_per_page,
+    Encoding::type encoding, vector<shared_ptr<Buffer> >& rle_indices) {
+  InMemoryOutputStream page_stream;
+  test::DictionaryPageBuilder<Type> page_builder(d);
+  int num_pages = values_per_page.size();
+  int value_start = 0;
+
+  for (int i = 0; i < num_pages; i++) {
+    rle_indices.push_back(page_builder.AppendValues(slice(values, value_start,
+          value_start + values_per_page[i])));
+    value_start += values_per_page[i];
+  }
+
+  auto buffer = page_builder.WriteDict();
+
+  return std::make_shared<DictionaryPage>(buffer, page_builder.num_values(),
+      Encoding::PLAIN);
+}
+
+// Given def/rep levels and values create multiple dict pages
+template <typename Type>
+static void PaginateDict(const ColumnDescriptor *d,
+    const vector<typename Type::c_type>& values,
+    const vector<int16_t>& def_levels, int16_t max_def_level,
+    const vector<int16_t>& rep_levels, int16_t max_rep_level,
+    int num_levels_per_page, const vector<int>& values_per_page,
+    vector<shared_ptr<Page> >& pages,
+    Encoding::type encoding = Encoding::RLE_DICTIONARY) {
+  int num_pages = values_per_page.size();
+  vector<shared_ptr<Buffer> > rle_indices;
+  shared_ptr<DictionaryPage> dict_page = MakeDictPage<Type>(d, values, values_per_page,
+      encoding, rle_indices);
+  pages.push_back(dict_page);
+  int def_level_start = 0;
+  int def_level_end = 0;
+  int rep_level_start = 0;
+  int rep_level_end = 0;
+  for (int i = 0; i < num_pages; i++) {
+    if (max_def_level > 0) {
+      def_level_start = i * num_levels_per_page;
+      def_level_end = (i + 1) * num_levels_per_page;
+    }
+    if (max_rep_level > 0) {
+      rep_level_start = i * num_levels_per_page;
+      rep_level_end = (i + 1) * num_levels_per_page;
+    }
+    shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(d, {}, values_per_page[i],
+        encoding, rle_indices[i]->data(), rle_indices[i]->size(),
+        slice(def_levels, def_level_start, def_level_end), max_def_level,
+        slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
+    pages.push_back(data_page);
+  }
+}
+
+// Given def/rep levels and values create multiple plain pages
+template <typename Type>
+static void PaginatePlain(const ColumnDescriptor *d,
+    const vector<typename Type::c_type>& values,
+    const vector<int16_t>& def_levels, int16_t max_def_level,
+    const vector<int16_t>& rep_levels, int16_t max_rep_level,
+    int num_levels_per_page, const vector<int>& values_per_page,
+    vector<shared_ptr<Page> >& pages,
+    Encoding::type encoding = Encoding::PLAIN) {
   int num_pages = values_per_page.size();
   int def_level_start = 0;
   int def_level_end = 0;
@@ -225,8 +392,9 @@ static void Paginate(const ColumnDescriptor *d,
       rep_level_start = i * num_levels_per_page;
       rep_level_end = (i + 1) * num_levels_per_page;
     }
-    std::shared_ptr<DataPage> page = MakeDataPage<TYPE>(d,
-        slice(values, value_start, value_start + values_per_page[i]),
+    shared_ptr<DataPage> page = MakeDataPage<Type>(d,
+        slice(values, value_start, value_start + values_per_page[i]), values_per_page[i],
+        encoding, NULL, 0,
         slice(def_levels, def_level_start, def_level_end), max_def_level,
         slice(rep_levels, rep_level_start, rep_level_end), max_rep_level);
     pages.push_back(page);
@@ -234,6 +402,58 @@ static void Paginate(const ColumnDescriptor *d,
   }
 }
 
+// Generates pages from randomly generated data
+template <typename Type>
+static int MakePages(const ColumnDescriptor *d, int num_pages, int levels_per_page,
+    vector<int16_t>& def_levels, vector<int16_t>& rep_levels,
+    vector<typename Type::c_type>& values, vector<uint8_t>& buffer,
+    vector<shared_ptr<Page> >& pages,
+    Encoding::type encoding = Encoding::PLAIN) {
+  int num_levels = levels_per_page * num_pages;
+  int num_values = 0;
+  uint32_t seed = 0;
+  int16_t zero = 0;
+  int16_t max_def_level = d->max_definition_level();
+  int16_t max_rep_level = d->max_repetition_level();
+  vector<int> values_per_page(num_pages, levels_per_page);
+  // Create definition levels
+  if (max_def_level > 0) {
+    def_levels.resize(num_levels);
+    random_numbers(num_levels, seed, zero, max_def_level, def_levels.data());
+    for (int p = 0; p < num_pages; p++) {
+      int num_values_per_page = 0;
+      for (int i = 0; i < levels_per_page; i++) {
+        if (def_levels[i + p * levels_per_page] == max_def_level) {
+          num_values_per_page++;
+          num_values++;
+        }
+      }
+      values_per_page[p] = num_values_per_page;
+    }
+  } else {
+    num_values = num_levels;
+  }
+  // Create repitition levels
+  if (max_rep_level > 0) {
+    rep_levels.resize(num_levels);
+    random_numbers(num_levels, seed, zero, max_rep_level, rep_levels.data());
+  }
+  // Create values
+  values.resize(num_values);
+  if (encoding == Encoding::PLAIN) {
+    InitValues<typename Type::c_type>(num_values, values, buffer);
+    PaginatePlain<Type>(d, values, def_levels, max_def_level,
+        rep_levels, max_rep_level, levels_per_page, values_per_page, pages);
+  } else if (encoding == Encoding::RLE_DICTIONARY) {
+    // Calls InitValues and repeats the data
+    InitDictValues<typename Type::c_type>(num_values, levels_per_page, values, buffer);
+    PaginateDict<Type>(d, values, def_levels, max_def_level,
+        rep_levels, max_rep_level, levels_per_page, values_per_page, pages);
+  }
+
+  return num_values;
+}
+
 } // namespace test
 
 } // namespace parquet_cpp


Mime
View raw message