parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject [2/2] parquet-cpp git commit: PARQUET-494: Implement DictionaryEncoder and test dictionary decoding
Date Fri, 26 Feb 2016 17:52:51 GMT
PARQUET-494: Implement DictionaryEncoder and test dictionary decoding

I incorporated quite a bit of code from Impala for this patch, but did a bunch of work to get everything working. In particular, I wasn't happy with the hash table implementation in `dict-encoder.h` and so have written a simple new one that we can benchmark and tune as necessary.

The simplest way to pull in the DictEncoder (PARQUET-493) was to also bring in the `MemPool` implementation, suitably trimmed down. We can continue to refactor this as needed for parquet-cpp.

I also did some light refactoring using `TYPED_TEST` in `plain-encoding-test` (now `encoding-test`).

Author: Wes McKinney <wesm@apache.org>

Closes #64 from wesm/PARQUET-494 and squashes the following commits:

c634abe [Wes McKinney] Refactor to create TestEncoderBase
a3a563a [Wes McKinney] Consolidate dictionary encoding code
2cc4ffe [Wes McKinney] Retrieve type_length() only once in PlainDecoder ctor
20ccd9e [Wes McKinney] Remove DictionaryEncoder shim layer for now
dcfc0aa [Wes McKinney] Remove redundant Int96 comparison
d98a2c0 [Wes McKinney] Dictionary encoding for booleans throws exception
05414f0 [Wes McKinney] Test dictionary encoding more types
9a5b1a4 [Wes McKinney] Enable include_order linting per PARQUET-539
f3f0efc [Wes McKinney] IWYU cleaning
d4191c6 [Wes McKinney] Add header installs, fix clang warning
1347b13 [Wes McKinney] Rename plain-encoding-test to encoding-test
09bf0fa [Wes McKinney] Fix bugs and add dictionary repeats
2e6af48 [Wes McKinney] Fix some bugs. FixedLenByteArray remains to get working.
69b5b69 [Wes McKinney] Refactor test fixtures to be less coupled to state. process on getting dict encoding working
6b23716 [Wes McKinney] Create reusable DataType structs for test fixtures and other compile-time type resolution matters
67883fd [Wes McKinney] Bunch of combined work for dict encoding support:


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c6e06929
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c6e06929
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c6e06929

Branch: refs/heads/master
Commit: c6e069297a3b8d0f9ad45da04fe114d40c593115
Parents: 1df5a26
Author: Wes McKinney <wesm@apache.org>
Authored: Fri Feb 26 09:52:46 2016 -0800
Committer: Julien Le Dem <julien@dremio.com>
Committed: Fri Feb 26 09:52:46 2016 -0800

----------------------------------------------------------------------
 CMakeLists.txt                               |   2 +-
 src/parquet/column/column-reader-test.cc     |   1 +
 src/parquet/column/levels-test.cc            |   3 +-
 src/parquet/column/reader.cc                 |   5 +-
 src/parquet/column/scanner-test.cc           |  54 ++--
 src/parquet/compression/codec-test.cc        |   5 +-
 src/parquet/compression/codec.h              |   4 +-
 src/parquet/compression/lz4-codec.cc         |   3 +-
 src/parquet/compression/snappy-codec.cc      |   3 +-
 src/parquet/encodings/CMakeLists.txt         |   2 +-
 src/parquet/encodings/dictionary-encoding.h  | 311 +++++++++++++++++++++-
 src/parquet/encodings/encoder.h              |   5 -
 src/parquet/encodings/encoding-test.cc       | 309 +++++++++++++++++++++
 src/parquet/encodings/plain-encoding-test.cc | 232 ----------------
 src/parquet/encodings/plain-encoding.h       |  96 ++++---
 src/parquet/file/reader-internal.h           |   2 -
 src/parquet/reader-test.cc                   |   3 +-
 src/parquet/schema/schema-descriptor-test.cc |   3 +-
 src/parquet/types.h                          |  36 +++
 src/parquet/util/CMakeLists.txt              |  16 +-
 src/parquet/util/bit-stream-utils.h          |   2 +-
 src/parquet/util/bit-util-test.cc            |   5 +-
 src/parquet/util/buffer-builder.h            |  61 +++++
 src/parquet/util/cpu-info.cc                 |  10 +-
 src/parquet/util/dict-encoding.h             |  36 +++
 src/parquet/util/hash-util.h                 | 247 +++++++++++++++++
 src/parquet/util/mem-pool-test.cc            | 247 +++++++++++++++++
 src/parquet/util/mem-pool.cc                 | 234 ++++++++++++++++
 src/parquet/util/mem-pool.h                  | 208 +++++++++++++++
 src/parquet/util/output.h                    |   1 -
 src/parquet/util/rle-encoding.h              |   2 +-
 src/parquet/util/rle-test.cc                 |   9 +-
 src/parquet/util/sse-util.h                  |  30 +++
 src/parquet/util/stopwatch.h                 |   5 +-
 src/parquet/util/test-common.h               |  13 +-
 35 files changed, 1840 insertions(+), 365 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 218e74a..5ff9e6c 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -267,7 +267,7 @@ if (UNIX)
   add_custom_target(lint ${BUILD_SUPPORT_DIR}/cpplint.py
   --verbose=2
   --linelength=90
-  --filter=-whitespace/comments,-readability/todo,-build/header_guard,-build/include_order,-runtime/references,-readability/check
+  --filter=-whitespace/comments,-readability/todo,-build/header_guard,-runtime/references,-readability/check
     `find ${CMAKE_CURRENT_SOURCE_DIR}/src -name \\*.cc -or -name \\*.h | sed -e '/parquet\\/thrift/g'`)
 endif (UNIX)
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 079201a..e64ef28 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -20,6 +20,7 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstdlib>
+#include <limits>
 #include <memory>
 #include <string>
 #include <vector>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/levels-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/levels-test.cc b/src/parquet/column/levels-test.cc
index 0e3c20f..57aa562 100644
--- a/src/parquet/column/levels-test.cc
+++ b/src/parquet/column/levels-test.cc
@@ -15,13 +15,12 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <gtest/gtest.h>
 #include <cstdint>
 #include <memory>
 #include <vector>
 #include <string>
 
-#include <gtest/gtest.h>
-
 #include "parquet/column/levels.h"
 #include "parquet/types.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index 4011347..4cff810 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -52,8 +52,9 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
   //
   // TODO(wesm): investigate whether this all-or-nothing decoding of the
   // dictionary makes sense and whether performance can be improved
-  std::shared_ptr<DecoderType> decoder(
-      new DictionaryDecoder<TYPE>(descr_, &dictionary));
+
+  auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_);
+  decoder->SetDict(&dictionary);
 
   decoders_[encoding] = decoder;
   current_decoder_ = decoders_[encoding].get();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/column/scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc
index be6b42e..785db08 100644
--- a/src/parquet/column/scanner-test.cc
+++ b/src/parquet/column/scanner-test.cc
@@ -40,16 +40,6 @@ namespace parquet_cpp {
 
 using schema::NodePtr;
 
-bool operator==(const Int96& a, const Int96& b) {
-  return a.value[0] == b.value[0] &&
-    a.value[1] == b.value[1] &&
-    a.value[2] == b.value[2];
-}
-
-bool operator==(const ByteArray& a, const ByteArray& b) {
-  return a.len == b.len && 0 == memcmp(a.ptr, b.ptr, a.len);
-}
-
 static int FLBA_LENGTH = 12;
 bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
   return 0 == memcmp(a.ptr, b.ptr, FLBA_LENGTH);
@@ -57,16 +47,10 @@ bool operator==(const FixedLenByteArray& a, const FixedLenByteArray& b) {
 
 namespace test {
 
-template <int N> class TypeValue {
- public:
-  static const int value = N;
-};
-template <int N> const int TypeValue<N>::value;
-
-template <typename TYPE>
+template <typename Type>
 class TestFlatScanner : public ::testing::Test {
  public:
-  typedef typename type_traits<TYPE::value>::value_type T;
+  typedef typename Type::c_type T;
 
   void InitValues() {
     random_numbers(num_values_, 0, std::numeric_limits<T>::min(),
@@ -106,7 +90,7 @@ class TestFlatScanner : public ::testing::Test {
     // Create values
     values_.resize(num_values_);
     InitValues();
-    Paginate<TYPE::value>(d, values_, def_levels_, max_def_level,
+    Paginate<Type::type_num>(d, values_, def_levels_, max_def_level,
         rep_levels_, max_rep_level, levels_per_page, values_per_page, pages_);
   }
 
@@ -116,8 +100,8 @@ class TestFlatScanner : public ::testing::Test {
   }
 
   void CheckResults(int batch_size, const ColumnDescriptor *d) {
-    TypedScanner<TYPE::value>* scanner =
-      reinterpret_cast<TypedScanner<TYPE::value>* >(scanner_.get());
+    TypedScanner<Type::type_num>* scanner =
+      reinterpret_cast<TypedScanner<Type::type_num>* >(scanner_.get());
     T val;
     bool is_null;
     int16_t def_level;
@@ -158,14 +142,11 @@ class TestFlatScanner : public ::testing::Test {
   void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1,
       std::shared_ptr<ColumnDescriptor>& d2, std::shared_ptr<ColumnDescriptor>& d3) {
     NodePtr type;
-    type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
-        static_cast<Type::type>(TYPE::value));
+    type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED, Type::type_num);
     d1.reset(new ColumnDescriptor(type, 0, 0));
-    type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL,
-        static_cast<Type::type>(TYPE::value));
+    type = schema::PrimitiveNode::Make("c2", Repetition::OPTIONAL, Type::type_num);
     d2.reset(new ColumnDescriptor(type, 4, 0));
-    type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED,
-        static_cast<Type::type>(TYPE::value));
+    type = schema::PrimitiveNode::Make("c3", Repetition::REPEATED, Type::type_num);
     d3.reset(new ColumnDescriptor(type, 4, 2));
   }
 
@@ -194,18 +175,18 @@ class TestFlatScanner : public ::testing::Test {
 };
 
 template<>
-void TestFlatScanner<TypeValue<Type::BOOLEAN> >::InitValues() {
+void TestFlatScanner<BooleanType>::InitValues() {
   values_ = flip_coins(num_values_, 0);
 }
 
 template<>
-void TestFlatScanner<TypeValue<Type::INT96> >::InitValues() {
+void TestFlatScanner<Int96Type>::InitValues() {
   random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(),
       std::numeric_limits<int32_t>::max(), values_.data());
 }
 
 template<>
-void TestFlatScanner<TypeValue<Type::BYTE_ARRAY> >::InitValues() {
+void TestFlatScanner<ByteArrayType>::InitValues() {
   int max_byte_array_len = 12;
   int num_bytes = max_byte_array_len + sizeof(uint32_t);
   size_t nbytes = num_values_ * num_bytes;
@@ -215,7 +196,7 @@ void TestFlatScanner<TypeValue<Type::BYTE_ARRAY> >::InitValues() {
 }
 
 template<>
-void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitValues() {
+void TestFlatScanner<FLBAType>::InitValues() {
   size_t nbytes = num_values_ * FLBA_LENGTH;
   data_buffer_.resize(nbytes);
   random_fixed_byte_array(num_values_, 0, data_buffer_.data(), FLBA_LENGTH,
@@ -223,7 +204,7 @@ void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitValues() {
 }
 
 template<>
-void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitDescriptors(
+void TestFlatScanner<FLBAType>::InitDescriptors(
     std::shared_ptr<ColumnDescriptor>& d1, std::shared_ptr<ColumnDescriptor>& d2,
     std::shared_ptr<ColumnDescriptor>& d3) {
   NodePtr type = schema::PrimitiveNode::MakeFLBA("c1", Repetition::REQUIRED,
@@ -237,18 +218,13 @@ void TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY> >::InitDescriptors(
   d3.reset(new ColumnDescriptor(type, 4, 2));
 }
 
-typedef TestFlatScanner<TypeValue<Type::FIXED_LEN_BYTE_ARRAY>> TestFlatFLBAScanner;
+typedef TestFlatScanner<FLBAType> TestFlatFLBAScanner;
 
 static int num_levels_per_page = 100;
 static int num_pages = 20;
 static int batch_size = 32;
 
-typedef ::testing::Types<TypeValue<Type::BOOLEAN>, TypeValue<Type::INT32>,
-    TypeValue<Type::INT64>, TypeValue<Type::INT96>, TypeValue<Type::FLOAT>,
-    TypeValue<Type::DOUBLE>, TypeValue<Type::BYTE_ARRAY>,
-    TypeValue<Type::FIXED_LEN_BYTE_ARRAY> > Primitives;
-
-TYPED_TEST_CASE(TestFlatScanner, Primitives);
+TYPED_TEST_CASE(TestFlatScanner, ParquetTypes);
 
 TYPED_TEST(TestFlatScanner, TestScanner) {
   this->ExecuteAll(num_pages, num_levels_per_page, batch_size);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/codec-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec-test.cc b/src/parquet/compression/codec-test.cc
index 610fb37..285559a 100644
--- a/src/parquet/compression/codec-test.cc
+++ b/src/parquet/compression/codec-test.cc
@@ -15,14 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <gtest/gtest.h>
 #include <cstdint>
 #include <string>
 #include <vector>
 
-#include <gtest/gtest.h>
-#include "parquet/util/test-common.h"
-
 #include "parquet/compression/codec.h"
+#include "parquet/util/test-common.h"
 
 using std::string;
 using std::vector;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
index bc73f02..df15d61 100644
--- a/src/parquet/compression/codec.h
+++ b/src/parquet/compression/codec.h
@@ -18,11 +18,11 @@
 #ifndef PARQUET_COMPRESSION_CODEC_H
 #define PARQUET_COMPRESSION_CODEC_H
 
+#include <zlib.h>
+
 #include <cstdint>
 #include <memory>
 
-#include <zlib.h>
-
 #include "parquet/exception.h"
 #include "parquet/types.h"
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/lz4-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/lz4-codec.cc b/src/parquet/compression/lz4-codec.cc
index a131031..81413bb 100644
--- a/src/parquet/compression/lz4-codec.cc
+++ b/src/parquet/compression/lz4-codec.cc
@@ -15,11 +15,10 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "parquet/compression/codec.h"
-
 #include <lz4.h>
 #include <cstdint>
 
+#include "parquet/compression/codec.h"
 #include "parquet/exception.h"
 
 namespace parquet_cpp {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/compression/snappy-codec.cc
----------------------------------------------------------------------
diff --git a/src/parquet/compression/snappy-codec.cc b/src/parquet/compression/snappy-codec.cc
index 91590db..991dd04 100644
--- a/src/parquet/compression/snappy-codec.cc
+++ b/src/parquet/compression/snappy-codec.cc
@@ -15,12 +15,11 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include "parquet/compression/codec.h"
-
 #include <snappy.h>
 #include <cstdint>
 #include <cstdlib>
 
+#include "parquet/compression/codec.h"
 #include "parquet/exception.h"
 
 namespace parquet_cpp {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt
index c9349af..eb4cc3c 100644
--- a/src/parquet/encodings/CMakeLists.txt
+++ b/src/parquet/encodings/CMakeLists.txt
@@ -26,4 +26,4 @@ install(FILES
   plain-encoding.h
   DESTINATION include/parquet/encodings)
 
-ADD_PARQUET_TEST(plain-encoding-test)
+ADD_PARQUET_TEST(encoding-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index b52aefb..eed0659 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -20,10 +20,16 @@
 
 #include <algorithm>
 #include <cstdint>
+#include <iostream>
+#include <limits>
 #include <vector>
 
 #include "parquet/encodings/decoder.h"
 #include "parquet/encodings/encoder.h"
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/util/dict-encoding.h"
+#include "parquet/util/hash-util.h"
+#include "parquet/util/mem-pool.h"
 #include "parquet/util/rle-encoding.h"
 
 namespace parquet_cpp {
@@ -36,14 +42,12 @@ class DictionaryDecoder : public Decoder<TYPE> {
   // Initializes the dictionary with values from 'dictionary'. The data in
   // dictionary is not guaranteed to persist in memory after this call so the
   // dictionary decoder needs to copy the data out if necessary.
-  DictionaryDecoder(const ColumnDescriptor* descr,
-      Decoder<TYPE>* dictionary)
+  explicit DictionaryDecoder(const ColumnDescriptor* descr)
       : Decoder<TYPE>(descr, Encoding::RLE_DICTIONARY) {
-    Init(dictionary);
   }
 
   // Perform type-specific initiatialization
-  void Init(Decoder<TYPE>* dictionary);
+  void SetDict(Decoder<TYPE>* dictionary);
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
     num_values_ = num_values;
@@ -83,20 +87,20 @@ class DictionaryDecoder : public Decoder<TYPE> {
 };
 
 template <int TYPE>
-inline void DictionaryDecoder<TYPE>::Init(Decoder<TYPE>* dictionary) {
+inline void DictionaryDecoder<TYPE>::SetDict(Decoder<TYPE>* dictionary) {
   int num_dictionary_values = dictionary->values_left();
   dictionary_.resize(num_dictionary_values);
   dictionary->Decode(&dictionary_[0], num_dictionary_values);
 }
 
 template <>
-inline void DictionaryDecoder<Type::BOOLEAN>::Init(
+inline void DictionaryDecoder<Type::BOOLEAN>::SetDict(
     Decoder<Type::BOOLEAN>* dictionary) {
   ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
 }
 
 template <>
-inline void DictionaryDecoder<Type::BYTE_ARRAY>::Init(
+inline void DictionaryDecoder<Type::BYTE_ARRAY>::SetDict(
     Decoder<Type::BYTE_ARRAY>* dictionary) {
   int num_dictionary_values = dictionary->values_left();
   dictionary_.resize(num_dictionary_values);
@@ -116,7 +120,7 @@ inline void DictionaryDecoder<Type::BYTE_ARRAY>::Init(
 }
 
 template <>
-inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Init(
+inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::SetDict(
     Decoder<Type::FIXED_LEN_BYTE_ARRAY>* dictionary) {
   int num_dictionary_values = dictionary->values_left();
   dictionary_.resize(num_dictionary_values);
@@ -134,6 +138,297 @@ inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Init(
   }
 }
 
+// ----------------------------------------------------------------------
+// Dictionary encoder
+
+// Initially imported from Apache Impala on 2016-02-22, and has been modified
+// since for parquet-cpp
+
+// Initially 1024 elements
+static constexpr int INITIAL_HASH_TABLE_SIZE = 1 << 10;
+
+typedef int32_t hash_slot_t;
+static constexpr hash_slot_t HASH_SLOT_EMPTY = std::numeric_limits<int32_t>::max();
+
+// The maximum load factor for the hash table before resizing.
+static constexpr double MAX_HASH_LOAD = 0.7;
+
+/// See the dictionary encoding section of https://github.com/Parquet/parquet-format.
+/// The encoding supports streaming encoding. Values are encoded as they are added while
+/// the dictionary is being constructed. At any time, the buffered values can be
+/// written out with the current dictionary size. More values can then be added to
+/// the encoder, including new dictionary entries.
+class DictEncoderBase {
+ public:
+  virtual ~DictEncoderBase() {
+    DCHECK(buffered_indices_.empty());
+  }
+
+  /// Writes out the encoded dictionary to buffer. buffer must be preallocated to
+  /// dict_encoded_size() bytes.
+  virtual void WriteDict(uint8_t* buffer) = 0;
+
+  /// The number of entries in the dictionary.
+  virtual int num_entries() const = 0;
+
+  /// Clears all the indices (but leaves the dictionary).
+  void ClearIndices() { buffered_indices_.clear(); }
+
+  /// Returns a conservative estimate of the number of bytes needed to encode the buffered
+  /// indices. Used to size the buffer passed to WriteIndices().
+  int EstimatedDataEncodedSize() {
+    return 1 + RleEncoder::MaxBufferSize(bit_width(), buffered_indices_.size());
+  }
+
+  /// The minimum bit width required to encode the currently buffered indices.
+  int bit_width() const {
+    if (UNLIKELY(num_entries() == 0)) return 0;
+    if (UNLIKELY(num_entries() == 1)) return 1;
+    return BitUtil::Log2(num_entries());
+  }
+
+  /// Writes out any buffered indices to buffer preceded by the bit width of this data.
+  /// Returns the number of bytes written.
+  /// If the supplied buffer is not big enough, returns -1.
+  /// buffer must be preallocated with buffer_len bytes. Use EstimatedDataEncodedSize()
+  /// to size buffer.
+  int WriteIndices(uint8_t* buffer, int buffer_len);
+
+  int hash_table_size() { return hash_table_size_; }
+  int dict_encoded_size() { return dict_encoded_size_; }
+
+ protected:
+  explicit DictEncoderBase(MemPool* pool) :
+      hash_table_size_(INITIAL_HASH_TABLE_SIZE),
+      mod_bitmask_(hash_table_size_ - 1),
+      hash_slots_(hash_table_size_, HASH_SLOT_EMPTY),
+      pool_(pool),
+      dict_encoded_size_(0) {}
+
+  /// Size of the table. Must be a power of 2.
+  int hash_table_size_;
+
+  // Store hash_table_size_ - 1, so that j & mod_bitmask_ is equivalent to j %
+  // hash_table_size_, but uses far fewer CPU cycles
+  int mod_bitmask_;
+
+  // We use a fixed-size hash table with linear probing
+  //
+  // These values correspond to the uniques_ array
+  std::vector<hash_slot_t> hash_slots_;
+
+  // For ByteArray / FixedLenByteArray data. Not owned
+  MemPool* pool_;
+
+  /// Indices that have not yet be written out by WriteIndices().
+  std::vector<int> buffered_indices_;
+
+  /// The number of bytes needed to encode the dictionary.
+  int dict_encoded_size_;
+};
+
+template <typename T>
+class DictEncoder : public DictEncoderBase {
+ public:
+  explicit DictEncoder(MemPool* pool = nullptr, int type_length = -1) :
+      DictEncoderBase(pool),
+      type_length_(type_length) { }
+
+  // TODO(wesm): think about how to address the construction semantics in
+  // encodings/dictionary-encoding.h
+  void set_mem_pool(MemPool* pool) {
+    pool_ = pool;
+  }
+
+  void set_type_length(int type_length) {
+    type_length_ = type_length;
+  }
+
+  /// Encode value. Note that this does not actually write any data, just
+  /// buffers the value's index to be written later.
+  void Put(const T& value);
+
+  virtual void WriteDict(uint8_t* buffer);
+
+  virtual int num_entries() const { return uniques_.size(); }
+
+ private:
+  // The unique observed values
+  std::vector<T> uniques_;
+
+  bool SlotDifferent(const T& v, hash_slot_t slot);
+  void DoubleTableSize();
+
+  /// Size of each encoded dictionary value. -1 for variable-length types.
+  int type_length_;
+
+  /// Hash function for mapping a value to a bucket.
+  inline uint32_t Hash(const T& value) const;
+
+  /// Adds value to the hash table and updates dict_encoded_size_
+  void AddDictKey(const T& value);
+};
+
+template<typename T>
+inline uint32_t DictEncoder<T>::Hash(const T& value) const {
+  return HashUtil::Hash(&value, sizeof(value), 0);
+}
+
+template<>
+inline uint32_t DictEncoder<ByteArray>::Hash(const ByteArray& value) const {
+  return HashUtil::Hash(value.ptr, value.len, 0);
+}
+
+template<>
+inline uint32_t DictEncoder<FixedLenByteArray>::Hash(
+    const FixedLenByteArray& value) const {
+  return HashUtil::Hash(value.ptr, type_length_, 0);
+}
+
+template <typename T>
+inline bool DictEncoder<T>::SlotDifferent(const T& v, hash_slot_t slot) {
+  return v != uniques_[slot];
+}
+
+template <>
+inline bool DictEncoder<FixedLenByteArray>::SlotDifferent(
+    const FixedLenByteArray& v, hash_slot_t slot) {
+  return 0 != memcmp(v.ptr, uniques_[slot].ptr, type_length_);
+}
+
+template <typename T>
+inline void DictEncoder<T>::Put(const T& v) {
+  uint32_t j = Hash(v) & mod_bitmask_;
+  hash_slot_t index = hash_slots_[j];
+
+  // Find an empty slot
+  while (HASH_SLOT_EMPTY != index && SlotDifferent(v, index)) {
+    // Linear probing
+    ++j;
+    if (j == hash_table_size_) j = 0;
+    index = hash_slots_[j];
+  }
+
+  int bytes_added = 0;
+  if (index == HASH_SLOT_EMPTY) {
+    // Not in the hash table, so we insert it now
+    index = uniques_.size();
+    hash_slots_[j] = index;
+    AddDictKey(v);
+
+    if (UNLIKELY(uniques_.size() >
+            static_cast<size_t>(hash_table_size_ * MAX_HASH_LOAD))) {
+      DoubleTableSize();
+    }
+  }
+
+  buffered_indices_.push_back(index);
+}
+
+template <typename T>
+inline void DictEncoder<T>::DoubleTableSize() {
+  int new_size = hash_table_size_ * 2;
+  std::vector<hash_slot_t> new_hash_slots(new_size, HASH_SLOT_EMPTY);
+  hash_slot_t index, slot;
+  uint32_t j;
+  for (int i = 0; i < hash_table_size_; ++i) {
+    index = hash_slots_[i];
+
+    if (index == HASH_SLOT_EMPTY) {
+      continue;
+    }
+
+    // Compute the hash value mod the new table size to start looking for an
+    // empty slot
+    const T& v = uniques_[index];
+
+    // Find an empty slot in the new hash table
+    j = Hash(v) & (new_size - 1);
+    slot = new_hash_slots[j];
+    while (HASH_SLOT_EMPTY != slot && SlotDifferent(v, slot)) {
+      ++j;
+      if (j == new_size) j = 0;
+      slot = new_hash_slots[j];
+    }
+
+    // Copy the old slot index to the new hash table
+    new_hash_slots[j] = index;
+  }
+
+  hash_table_size_ = new_size;
+  mod_bitmask_ = new_size - 1;
+  new_hash_slots.swap(hash_slots_);
+}
+
+template<typename T>
+inline void DictEncoder<T>::AddDictKey(const T& v) {
+  uniques_.push_back(v);
+  dict_encoded_size_ += sizeof(T);
+}
+
+template<>
+inline void DictEncoder<ByteArray>::AddDictKey(const ByteArray& v) {
+  uint8_t* heap = pool_->Allocate(v.len);
+  if (UNLIKELY(v.len > 0 && heap == nullptr)) {
+    throw ParquetException("out of memory");
+  }
+  memcpy(heap, v.ptr, v.len);
+
+  uniques_.push_back(ByteArray(v.len, heap));
+  dict_encoded_size_ += v.len + sizeof(uint32_t);
+}
+
+template<>
+inline void DictEncoder<FixedLenByteArray>::AddDictKey(const FixedLenByteArray& v) {
+  uint8_t* heap = pool_->Allocate(type_length_);
+  if (UNLIKELY(type_length_ > 0 && heap == nullptr)) {
+    throw ParquetException("out of memory");
+  }
+  memcpy(heap, v.ptr, type_length_);
+
+  uniques_.push_back(FixedLenByteArray(heap));
+  dict_encoded_size_ += type_length_;
+}
+
+template <typename T>
+inline void DictEncoder<T>::WriteDict(uint8_t* buffer) {
+  // For primitive types, only a memcpy
+  memcpy(buffer, &uniques_[0], sizeof(T) * uniques_.size());
+}
+
+// ByteArray and FLBA already have the dictionary encoded in their data heaps
+template <>
+inline void DictEncoder<ByteArray>::WriteDict(uint8_t* buffer) {
+  for (const ByteArray& v : uniques_) {
+    memcpy(buffer, reinterpret_cast<const void*>(&v.len), sizeof(uint32_t));
+    buffer += sizeof(uint32_t);
+    memcpy(buffer, v.ptr, v.len);
+    buffer += v.len;
+  }
+}
+
+template <>
+inline void DictEncoder<FixedLenByteArray>::WriteDict(uint8_t* buffer) {
+  for (const FixedLenByteArray& v : uniques_) {
+    memcpy(buffer, v.ptr, type_length_);
+    buffer += type_length_;
+  }
+}
+
+inline int DictEncoderBase::WriteIndices(uint8_t* buffer, int buffer_len) {
+  // Write bit width in first byte
+  *buffer = bit_width();
+  ++buffer;
+  --buffer_len;
+
+  RleEncoder encoder(buffer, buffer_len, bit_width());
+  for (int index : buffered_indices_) {
+    if (!encoder.Put(index)) return -1;
+  }
+  encoder.Flush();
+  return 1 + encoder.len();
+}
+
 } // namespace parquet_cpp
 
 #endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index 50ba48f..ce91a29 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -39,11 +39,6 @@ class Encoder {
 
   virtual ~Encoder() {}
 
-  // Subclasses should override the ones they support
-  virtual void Encode(const T* src, int num_values, OutputStream* dst) {
-    throw ParquetException("Encoder does not implement this type.");
-  }
-
   const Encoding::type encoding() const { return encoding_; }
 
  protected:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
new file mode 100644
index 0000000..10310ed
--- /dev/null
+++ b/src/parquet/encodings/encoding-test.cc
@@ -0,0 +1,309 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <string>
+#include <vector>
+
+#include "parquet/schema/descriptor.h"
+#include "parquet/encodings/dictionary-encoding.h"
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/types.h"
+#include "parquet/schema/types.h"
+#include "parquet/util/bit-util.h"
+#include "parquet/util/buffer.h"
+#include "parquet/util/dict-encoding.h"
+#include "parquet/util/output.h"
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+
+namespace parquet_cpp {
+
+namespace test {
+
+TEST(VectorBooleanTest, TestEncodeDecode) {
+  // PARQUET-454
+  int nvalues = 10000;
+  int nbytes = BitUtil::Ceil(nvalues, 8);
+
+  // seed the prng so failure is deterministic
+  vector<bool> draws = flip_coins_seed(nvalues, 0.5, 0);
+
+  PlainEncoder<Type::BOOLEAN> encoder(nullptr);
+  PlainDecoder<Type::BOOLEAN> decoder(nullptr);
+
+  InMemoryOutputStream dst;
+  encoder.Encode(draws, nvalues, &dst);
+
+  std::shared_ptr<Buffer> encode_buffer = dst.GetBuffer();
+  ASSERT_EQ(nbytes, encode_buffer->size());
+
+  vector<uint8_t> decode_buffer(nbytes);
+  const uint8_t* decode_data = &decode_buffer[0];
+
+  decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size());
+  int values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
+  ASSERT_EQ(nvalues, values_decoded);
+
+  for (int i = 0; i < nvalues; ++i) {
+    ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i;
+  }
+}
+
+// ----------------------------------------------------------------------
+// test data generation
+
+template <typename T>
+void GenerateData(int num_values, T* out, vector<uint8_t>* heap) {
+  // seed the prng so failure is deterministic
+  random_numbers(num_values, 0, std::numeric_limits<T>::min(),
+      std::numeric_limits<T>::max(), out);
+}
+
+template <>
+void GenerateData<bool>(int num_values, bool* out, vector<uint8_t>* heap) {
+  // seed the prng so failure is deterministic
+  random_bools(num_values, 0.5, 0, out);
+}
+
+template <>
+void GenerateData<Int96>(int num_values, Int96* out, vector<uint8_t>* heap) {
+  // seed the prng so failure is deterministic
+  random_Int96_numbers(num_values, 0, std::numeric_limits<int32_t>::min(),
+      std::numeric_limits<int32_t>::max(), out);
+}
+
+template <>
+void GenerateData<ByteArray>(int num_values, ByteArray* out, vector<uint8_t>* heap) {
+  // seed the prng so failure is deterministic
+  int max_byte_array_len = 12;
+  int num_bytes = max_byte_array_len + sizeof(uint32_t);
+  heap->resize(num_values * max_byte_array_len);
+  random_byte_array(num_values, 0, heap->data(), out, 2, max_byte_array_len);
+}
+
+static int flba_length = 8;
+
+template <>
+void GenerateData<FLBA>(int num_values, FLBA* out, vector<uint8_t>* heap) {
+  // seed the prng so failure is deterministic
+  heap->resize(num_values * flba_length);
+  random_fixed_byte_array(num_values, 0, heap->data(), flba_length, out);
+}
+
+template <typename T>
+void VerifyResults(T* result, T* expected, int num_values) {
+  for (int i = 0; i < num_values; ++i) {
+    ASSERT_EQ(expected[i], result[i]) << i;
+  }
+}
+
+template <>
+void VerifyResults<FLBA>(FLBA* result, FLBA* expected, int num_values) {
+  for (int i = 0; i < num_values; ++i) {
+    ASSERT_EQ(0, memcmp(expected[i].ptr, result[i].ptr, flba_length)) << i;
+  }
+}
+
+// ----------------------------------------------------------------------
+// Create some column descriptors
+
+template <typename T>
+std::shared_ptr<ColumnDescriptor> ExampleDescr() {
+  return nullptr;
+}
+
+template <>
+std::shared_ptr<ColumnDescriptor> ExampleDescr<FLBA>() {
+  auto node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL,
+      flba_length, LogicalType::UTF8);
+  return std::make_shared<ColumnDescriptor>(node, 0, 0);
+}
+
+// ----------------------------------------------------------------------
+// Plain encoding tests
+
+template <typename Type>
+class TestEncodingBase : public ::testing::Test {
+ public:
+  typedef typename Type::c_type T;
+  static constexpr int TYPE = Type::type_num;
+
+  void SetUp() {
+    descr_ = ExampleDescr<T>();
+    if (descr_) {
+      type_length_ = descr_->type_length();
+    }
+  }
+
+  void InitData(int nvalues, int repeats) {
+    num_values_ = nvalues * repeats;
+    input_bytes_.resize(num_values_ * sizeof(T));
+    output_bytes_.resize(num_values_ * sizeof(T));
+    draws_ = reinterpret_cast<T*>(input_bytes_.data());
+    decode_buf_ = reinterpret_cast<T*>(output_bytes_.data());
+    GenerateData<T>(nvalues, draws_, &data_buffer_);
+
+    // add some repeated values
+    for (int j = 1; j < repeats; ++j) {
+      for (int i = 0; i < nvalues; ++i) {
+        draws_[nvalues * j + i] = draws_[i];
+      }
+    }
+  }
+
+  virtual void CheckRoundtrip() = 0;
+
+  void Execute(int nvalues, int repeats) {
+    InitData(nvalues, repeats);
+    CheckRoundtrip();
+  }
+
+ protected:
+  MemPool pool_;
+
+  int num_values_;
+  int type_length_;
+  T* draws_;
+  T* decode_buf_;
+  vector<uint8_t> input_bytes_;
+  vector<uint8_t> output_bytes_;
+  vector<uint8_t> data_buffer_;
+
+  std::shared_ptr<Buffer> encode_buffer_;
+  std::shared_ptr<ColumnDescriptor> descr_;
+};
+
+// Member variables are not visible to templated subclasses. Possibly figure
+// out an alternative to this class layering at some point
+#define USING_BASE_MEMBERS()                    \
+  using TestEncodingBase<Type>::pool_;          \
+  using TestEncodingBase<Type>::descr_;         \
+  using TestEncodingBase<Type>::num_values_;    \
+  using TestEncodingBase<Type>::draws_;         \
+  using TestEncodingBase<Type>::data_buffer_;   \
+  using TestEncodingBase<Type>::type_length_;   \
+  using TestEncodingBase<Type>::encode_buffer_; \
+  using TestEncodingBase<Type>::decode_buf_;
+
+
+template <typename Type>
+class TestPlainEncoding : public TestEncodingBase<Type> {
+ public:
+  typedef typename Type::c_type T;
+  static constexpr int TYPE = Type::type_num;
+
+  virtual void CheckRoundtrip() {
+    PlainEncoder<TYPE> encoder(descr_.get());
+    PlainDecoder<TYPE> decoder(descr_.get());
+    InMemoryOutputStream dst;
+    encoder.Encode(draws_, num_values_, &dst);
+
+    encode_buffer_ = dst.GetBuffer();
+
+    decoder.SetData(num_values_, encode_buffer_->data(),
+        encode_buffer_->size());
+    int values_decoded = decoder.Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+    VerifyResults<T>(decode_buf_, draws_, num_values_);
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+};
+
+TYPED_TEST_CASE(TestPlainEncoding, ParquetTypes);
+
+TYPED_TEST(TestPlainEncoding, BasicRoundTrip) {
+  this->Execute(10000, 1);
+}
+
+// ----------------------------------------------------------------------
+// Dictionary encoding tests
+
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+                         ByteArrayType, FLBAType> DictEncodedTypes;
+
+template <typename Type>
+class TestDictionaryEncoding : public TestEncodingBase<Type> {
+ public:
+  typedef typename Type::c_type T;
+  static constexpr int TYPE = Type::type_num;
+
+  void CheckRoundtrip() {
+    DictEncoder<T> encoder(&pool_, type_length_);
+
+    dict_buffer_ = std::make_shared<OwnedMutableBuffer>();
+    auto indices = std::make_shared<OwnedMutableBuffer>();
+
+    ASSERT_NO_THROW(
+        {
+          for (int i = 0; i < num_values_; ++i) {
+            encoder.Put(draws_[i]);
+          }
+        });
+    dict_buffer_->Resize(encoder.dict_encoded_size());
+    encoder.WriteDict(dict_buffer_->mutable_data());
+
+    indices->Resize(encoder.EstimatedDataEncodedSize());
+    int actual_bytes = encoder.WriteIndices(indices->mutable_data(),
+        indices->size());
+    indices->Resize(actual_bytes);
+
+    PlainDecoder<TYPE> dict_decoder(descr_.get());
+    dict_decoder.SetData(encoder.num_entries(), dict_buffer_->data(),
+        dict_buffer_->size());
+
+    DictionaryDecoder<TYPE> decoder(descr_.get());
+    decoder.SetDict(&dict_decoder);
+
+    decoder.SetData(num_values_, indices->data(), indices->size());
+    int values_decoded = decoder.Decode(decode_buf_, num_values_);
+    ASSERT_EQ(num_values_, values_decoded);
+
+    // TODO(wesm): The DictionaryDecoder must stay alive because the decoded
+    // values' data is owned by a buffer inside the DictionaryEncoder. We
+    // should revisit when data lifetime is reviewed more generally.
+    VerifyResults<T>(decode_buf_, draws_, num_values_);
+  }
+
+ protected:
+  USING_BASE_MEMBERS();
+  std::shared_ptr<OwnedMutableBuffer> dict_buffer_;
+};
+
+TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes);
+
+TYPED_TEST(TestDictionaryEncoding, BasicRoundTrip) {
+  this->Execute(2500, 2);
+}
+
+TEST(TestDictionaryEncoding, CannotDictDecodeBoolean) {
+  PlainDecoder<Type::BOOLEAN> dict_decoder(nullptr);
+  DictionaryDecoder<Type::BOOLEAN> decoder(nullptr);
+
+  ASSERT_THROW(decoder.SetDict(&dict_decoder), ParquetException);
+}
+
+} // namespace test
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/plain-encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc
deleted file mode 100644
index 7ebd21f..0000000
--- a/src/parquet/encodings/plain-encoding-test.cc
+++ /dev/null
@@ -1,232 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdlib>
-#include <cstring>
-#include <string>
-#include <vector>
-
-#include <gtest/gtest.h>
-
-#include "parquet/schema/descriptor.h"
-#include "parquet/encodings/plain-encoding.h"
-#include "parquet/types.h"
-#include "parquet/schema/types.h"
-#include "parquet/util/bit-util.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/output.h"
-#include "parquet/util/test-common.h"
-
-using std::string;
-using std::vector;
-
-namespace parquet_cpp {
-
-namespace test {
-
-TEST(VectorBooleanTest, TestEncodeDecode) {
-  // PARQUET-454
-  size_t nvalues = 10000;
-  size_t nbytes = BitUtil::Ceil(nvalues, 8);
-
-  // seed the prng so failure is deterministic
-  vector<bool> draws = flip_coins_seed(nvalues, 0.5, 0);
-
-  PlainEncoder<Type::BOOLEAN> encoder(nullptr);
-  PlainDecoder<Type::BOOLEAN> decoder(nullptr);
-
-  InMemoryOutputStream dst;
-  encoder.Encode(draws, nvalues, &dst);
-
-  std::shared_ptr<Buffer> encode_buffer = dst.GetBuffer();
-  ASSERT_EQ(nbytes, encode_buffer->size());
-
-  vector<uint8_t> decode_buffer(nbytes);
-  const uint8_t* decode_data = &decode_buffer[0];
-
-  decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size());
-  size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
-  ASSERT_EQ(nvalues, values_decoded);
-
-  for (size_t i = 0; i < nvalues; ++i) {
-    ASSERT_EQ(draws[i], BitUtil::GetArrayBit(decode_data, i)) << i;
-  }
-}
-
-template<typename T, int TYPE>
-class EncodeDecode{
- public:
-  void init_data(int nvalues) {
-    num_values_ = nvalues;
-    input_bytes_.resize(num_values_ * sizeof(T));
-    output_bytes_.resize(num_values_ * sizeof(T));
-    draws_ = reinterpret_cast<T*>(input_bytes_.data());
-    decode_buf_ = reinterpret_cast<T*>(output_bytes_.data());
-  }
-
-  void generate_data() {
-    // seed the prng so failure is deterministic
-    random_numbers(num_values_, 0, std::numeric_limits<T>::min(),
-       std::numeric_limits<T>::max(), draws_);
-  }
-
-  void encode_decode(ColumnDescriptor *d) {
-    PlainEncoder<TYPE> encoder(d);
-    PlainDecoder<TYPE> decoder(d);
-
-    InMemoryOutputStream dst;
-    encoder.Encode(draws_, num_values_, &dst);
-
-    encode_buffer_ = dst.GetBuffer();
-
-    decoder.SetData(num_values_, encode_buffer_->data(),
-        encode_buffer_->size());
-    size_t values_decoded = decoder.Decode(decode_buf_, num_values_);
-    ASSERT_EQ(num_values_, values_decoded);
-  }
-
-  void verify_results() {
-    for (size_t i = 0; i < num_values_; ++i) {
-      ASSERT_EQ(draws_[i], decode_buf_[i]) << i;
-    }
-  }
-
-  void execute(int nvalues, ColumnDescriptor *d) {
-    init_data(nvalues);
-    generate_data();
-    encode_decode(d);
-    verify_results();
-  }
-
- private:
-  int num_values_;
-  T* draws_;
-  T* decode_buf_;
-  vector<uint8_t> input_bytes_;
-  vector<uint8_t> output_bytes_;
-  vector<uint8_t> data_buffer_;
-
-  std::shared_ptr<Buffer> encode_buffer_;
-};
-
-template<>
-void EncodeDecode<bool, Type::BOOLEAN>::generate_data() {
-  // seed the prng so failure is deterministic
-  random_bools(num_values_, 0.5, 0, draws_);
-}
-
-template<>
-void EncodeDecode<Int96, Type::INT96>::generate_data() {
-  // seed the prng so failure is deterministic
-    random_Int96_numbers(num_values_, 0, std::numeric_limits<int32_t>::min(),
-       std::numeric_limits<int32_t>::max(), draws_);
-}
-
-template<>
-void EncodeDecode<Int96, Type::INT96>::verify_results() {
-  for (size_t i = 0; i < num_values_; ++i) {
-    ASSERT_EQ(draws_[i].value[0], decode_buf_[i].value[0]) << i;
-    ASSERT_EQ(draws_[i].value[1], decode_buf_[i].value[1]) << i;
-    ASSERT_EQ(draws_[i].value[2], decode_buf_[i].value[2]) << i;
-  }
-}
-
-template<>
-void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::generate_data() {
-  // seed the prng so failure is deterministic
-  int max_byte_array_len = 12;
-  int num_bytes = max_byte_array_len + sizeof(uint32_t);
-  size_t nbytes = num_values_ * num_bytes;
-  data_buffer_.resize(nbytes);
-  random_byte_array(num_values_, 0, data_buffer_.data(), draws_,
-      max_byte_array_len);
-}
-
-template<>
-void EncodeDecode<ByteArray, Type::BYTE_ARRAY>::verify_results() {
-  for (size_t i = 0; i < num_values_; ++i) {
-    ASSERT_EQ(draws_[i].len, decode_buf_[i].len) << i;
-    ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, draws_[i].len)) << i;
-  }
-}
-
-static int flba_length = 8;
-template<>
-void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::generate_data() {
-  // seed the prng so failure is deterministic
-  size_t nbytes = num_values_ * flba_length;
-  data_buffer_.resize(nbytes);
-  ASSERT_EQ(nbytes, data_buffer_.size());
-  random_fixed_byte_array(num_values_, 0, data_buffer_.data(), flba_length, draws_);
-}
-
-template<>
-void EncodeDecode<FLBA, Type::FIXED_LEN_BYTE_ARRAY>::verify_results() {
-  for (size_t i = 0; i < num_values_; ++i) {
-    ASSERT_EQ(0, memcmp(draws_[i].ptr, decode_buf_[i].ptr, flba_length)) << i;
-  }
-}
-
-int num_values = 10000;
-
-TEST(BoolEncodeDecode, TestEncodeDecode) {
-  EncodeDecode<bool, Type::BOOLEAN> obj;
-  obj.execute(num_values, nullptr);
-}
-
-TEST(Int32EncodeDecode, TestEncodeDecode) {
-  EncodeDecode<int32_t, Type::INT32> obj;
-  obj.execute(num_values, nullptr);
-}
-
-TEST(Int64EncodeDecode, TestEncodeDecode) {
-  EncodeDecode<int64_t, Type::INT64> obj;
-  obj.execute(num_values, nullptr);
-}
-
-TEST(FloatEncodeDecode, TestEncodeDecode) {
-  EncodeDecode<float, Type::FLOAT> obj;
-  obj.execute(num_values, nullptr);
-}
-
-TEST(DoubleEncodeDecode, TestEncodeDecode) {
-  EncodeDecode<double, Type::DOUBLE> obj;
-  obj.execute(num_values, nullptr);
-}
-
-TEST(Int96EncodeDecode, TestEncodeDecode) {
-  EncodeDecode<Int96, Type::INT96> obj;
-  obj.execute(num_values, nullptr);
-}
-
-TEST(BAEncodeDecode, TestEncodeDecode) {
-  EncodeDecode<ByteArray, Type::BYTE_ARRAY> obj;
-  obj.execute(num_values, nullptr);
-}
-
-TEST(FLBAEncodeDecode, TestEncodeDecode) {
-  schema::NodePtr node;
-  node = schema::PrimitiveNode::MakeFLBA("name", Repetition::OPTIONAL,
-      flba_length, LogicalType::UTF8);
-  ColumnDescriptor d(node, 0, 0);
-  EncodeDecode<FixedLenByteArray, Type::FIXED_LEN_BYTE_ARRAY> obj;
-  obj.execute(num_values, &d);
-}
-
-} // namespace test
-} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index 83ee40c..9adabdf 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -40,7 +40,13 @@ class PlainDecoder : public Decoder<TYPE> {
 
   explicit PlainDecoder(const ColumnDescriptor* descr) :
       Decoder<TYPE>(descr, Encoding::PLAIN),
-      data_(NULL), len_(0) {}
+      data_(NULL), len_(0) {
+    if (descr_ && descr_->physical_type() == Type::FIXED_LEN_BYTE_ARRAY) {
+      type_length_ = descr_->type_length();
+    } else {
+      type_length_ = -1;
+    }
+  }
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
     num_values_ = num_values;
@@ -49,55 +55,69 @@ class PlainDecoder : public Decoder<TYPE> {
   }
 
   virtual int Decode(T* buffer, int max_values);
+
  private:
+  using Decoder<TYPE>::descr_;
   const uint8_t* data_;
   int len_;
+  int type_length_;
 };
 
-template <int TYPE>
-inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) {
-  max_values = std::min(max_values, num_values_);
-  int size = max_values * sizeof(T);
-  if (len_ < size)  ParquetException::EofException();
-  memcpy(buffer, data_, size);
-  data_ += size;
-  len_ -= size;
-  num_values_ -= max_values;
-  return max_values;
+// Decode routine templated on C++ type rather than type enum
+template <typename T>
+inline int DecodePlain(const uint8_t* data, int64_t data_size, int num_values,
+    int type_length, T* out) {
+  int bytes_to_decode = num_values * sizeof(T);
+  if (data_size < bytes_to_decode) {
+    ParquetException::EofException();
+  }
+  memcpy(out, data, bytes_to_decode);
+  return bytes_to_decode;
 }
 
-// Template specialization for BYTE_ARRAY
-// BA does not currently own its data
-// the lifetime is tied to the input stream
+// Template specialization for BYTE_ARRAY. The written values do not own their
+// own data.
 template <>
-inline int PlainDecoder<Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
-    int max_values) {
-  max_values = std::min(max_values, num_values_);
-  for (int i = 0; i < max_values; ++i) {
-    uint32_t len = buffer[i].len = *reinterpret_cast<const uint32_t*>(data_);
-    if (len_ < sizeof(uint32_t) + len) ParquetException::EofException();
-    buffer[i].ptr = data_ + sizeof(uint32_t);
-    data_ += sizeof(uint32_t) + len;
-    len_ -= sizeof(uint32_t) + len;
+inline int DecodePlain<ByteArray>(const uint8_t* data, int64_t data_size, int num_values,
+    int type_length, ByteArray* out) {
+  int bytes_decoded = 0;
+  int increment;
+  for (int i = 0; i < num_values; ++i) {
+    uint32_t len = out[i].len = *reinterpret_cast<const uint32_t*>(data);
+    increment = sizeof(uint32_t) + len;
+    if (data_size < increment) ParquetException::EofException();
+    out[i].ptr = data + sizeof(uint32_t);
+    data += increment;
+    data_size -= increment;
+    bytes_decoded += increment;
   }
-  num_values_ -= max_values;
-  return max_values;
+  return bytes_decoded;
 }
 
-// Template specialization for FIXED_LEN_BYTE_ARRAY
-// FLBA does not currently own its data
-// the lifetime is tied to the input stream
+// Template specialization for FIXED_LEN_BYTE_ARRAY. The written values do not
+// own their own data.
 template <>
-inline int PlainDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Decode(
-    FixedLenByteArray* buffer, int max_values) {
-  max_values = std::min(max_values, num_values_);
-  int len = descr_->type_length();
-  for (int i = 0; i < max_values; ++i) {
-    if (len_ < len) ParquetException::EofException();
-    buffer[i].ptr = data_;
-    data_ += len;
-    len_ -= len;
+inline int DecodePlain<FixedLenByteArray>(const uint8_t* data, int64_t data_size,
+    int num_values, int type_length, FixedLenByteArray* out) {
+  int bytes_to_decode = type_length * num_values;
+  if (data_size < bytes_to_decode) {
+    ParquetException::EofException();
   }
+  for (int i = 0; i < num_values; ++i) {
+    out[i].ptr = data;
+    data += type_length;
+    data_size -= type_length;
+  }
+  return bytes_to_decode;
+}
+
+template <int TYPE>
+inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) {
+  max_values = std::min(max_values, num_values_);
+  int bytes_consumed = DecodePlain<T>(data_, len_, max_values,
+      type_length_, buffer);
+  data_ += bytes_consumed;
+  len_ -= bytes_consumed;
   num_values_ -= max_values;
   return max_values;
 }
@@ -155,7 +175,7 @@ class PlainEncoder : public Encoder<TYPE> {
   explicit PlainEncoder(const ColumnDescriptor* descr) :
       Encoder<TYPE>(descr, Encoding::PLAIN) {}
 
-  virtual void Encode(const T* src, int num_values, OutputStream* dst);
+  void Encode(const T* src, int num_values, OutputStream* dst);
 };
 
 template <>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 7aff74a..08d4607 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -31,8 +31,6 @@
 
 namespace parquet_cpp {
 
-class SchemaDescriptor;
-
 // 16 MB is the default maximum page header size
 static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index be22e5a..e99140c 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -15,14 +15,13 @@
 // specific language governing permissions and limitations
 // under the License.
 
+#include <gtest/gtest.h>
 #include <cstdlib>
 #include <cstdint>
 #include <iostream>
 #include <memory>
 #include <string>
 
-#include <gtest/gtest.h>
-
 #include "parquet/file/reader.h"
 #include "parquet/column/reader.h"
 #include "parquet/column/scanner.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/schema/schema-descriptor-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/schema-descriptor-test.cc b/src/parquet/schema/schema-descriptor-test.cc
index 7677615..3b1734b 100644
--- a/src/parquet/schema/schema-descriptor-test.cc
+++ b/src/parquet/schema/schema-descriptor-test.cc
@@ -17,13 +17,12 @@
 
 // Schema / column descriptor correctness tests (from flat Parquet schemas)
 
+#include <gtest/gtest.h>
 #include <cstdint>
 #include <cstdlib>
 #include <string>
 #include <vector>
 
-#include <gtest/gtest.h>
-
 #include "parquet/exception.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/schema/types.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 8c5e123..f59f6a9 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -128,11 +128,24 @@ struct PageType {
 // ----------------------------------------------------------------------
 
 struct ByteArray {
+  ByteArray() {}
+  ByteArray(uint32_t len, const uint8_t* ptr) : len(len), ptr(ptr) {}
   uint32_t len;
   const uint8_t* ptr;
+
+  bool operator==(const ByteArray& other) const {
+    return this->len == other.len &&
+      0 == memcmp(this->ptr, other.ptr, this->len);
+  }
+
+  bool operator!=(const ByteArray& other) const {
+    return this->len != other.len || 0 != memcmp(this->ptr, other.ptr, this->len);
+  }
 };
 
 struct FixedLenByteArray {
+  FixedLenByteArray() {}
+  explicit FixedLenByteArray(const uint8_t* ptr) : ptr(ptr) {}
   const uint8_t* ptr;
 };
 
@@ -140,6 +153,14 @@ typedef FixedLenByteArray FLBA;
 
 MANUALLY_ALIGNED_STRUCT(1) Int96 {
   uint32_t value[3];
+
+  bool operator==(const Int96& other) const {
+    return 0 == memcmp(this->value, other.value, 3 * sizeof(uint32_t));
+  }
+
+  bool operator!=(const Int96& other) const {
+    return !(*this == other);
+  }
 };
 STRUCT_END(Int96, 12);
 
@@ -241,6 +262,21 @@ struct type_traits<Type::FIXED_LEN_BYTE_ARRAY> {
   static constexpr const char* printf_code = "s";
 };
 
+template <Type::type TYPE>
+struct DataType {
+  static constexpr Type::type type_num = TYPE;
+  typedef typename type_traits<TYPE>::value_type c_type;
+};
+
+typedef DataType<Type::BOOLEAN> BooleanType;
+typedef DataType<Type::INT32> Int32Type;
+typedef DataType<Type::INT64> Int64Type;
+typedef DataType<Type::INT96> Int96Type;
+typedef DataType<Type::FLOAT> FloatType;
+typedef DataType<Type::DOUBLE> DoubleType;
+typedef DataType<Type::BYTE_ARRAY> ByteArrayType;
+typedef DataType<Type::FIXED_LEN_BYTE_ARRAY> FLBAType;
+
 template <int TYPE>
 inline std::string format_fwf(int width) {
   std::stringstream ss;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 2b782fc..a009129 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -20,22 +20,27 @@ install(FILES
   bit-stream-utils.h
   bit-stream-utils.inline.h
   bit-util.h
-  cpu-info.h
-  sse-info.h
+  buffer-builder.h
   compiler-util.h
+  cpu-info.h
+  dict-encoding.h
+  hash-util.h
+  input.h
   logging.h
   macros.h
+  mem-pool.h
+  output.h
   rle-encoding.h
   stopwatch.h
-  input.h
-  output.h
+  sse-info.h
   DESTINATION include/parquet/util)
 
 add_library(parquet_util STATIC
   buffer.cc
+  cpu-info.cc
   input.cc
+  mem-pool.cc
   output.cc
-  cpu-info.cc
 )
 
 if(PARQUET_BUILD_TESTS)
@@ -58,5 +63,6 @@ endif()
 
 ADD_PARQUET_TEST(bit-util-test)
 ADD_PARQUET_TEST(buffer-test)
+ADD_PARQUET_TEST(mem-pool-test)
 ADD_PARQUET_TEST(output-test)
 ADD_PARQUET_TEST(rle-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/bit-stream-utils.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-stream-utils.h b/src/parquet/util/bit-stream-utils.h
index b93b90e..3636128 100644
--- a/src/parquet/util/bit-stream-utils.h
+++ b/src/parquet/util/bit-stream-utils.h
@@ -20,9 +20,9 @@
 #ifndef PARQUET_UTIL_BIT_STREAM_UTILS_H
 #define PARQUET_UTIL_BIT_STREAM_UTILS_H
 
+#include <string.h>
 #include <algorithm>
 #include <cstdint>
-#include <string.h>
 
 #include "parquet/util/compiler-util.h"
 #include "parquet/util/logging.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/bit-util-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-util-test.cc b/src/parquet/util/bit-util-test.cc
index a8b6be0..5ea4c11 100644
--- a/src/parquet/util/bit-util-test.cc
+++ b/src/parquet/util/bit-util-test.cc
@@ -19,11 +19,12 @@
 
 #include <stdlib.h>
 #include <stdio.h>
-#include <iostream>
 #include <limits.h>
+#include <gtest/gtest.h>
 
 #include <boost/utility.hpp>
-#include <gtest/gtest.h>
+
+#include <iostream>
 
 #include "parquet/util/bit-util.h"
 #include "parquet/util/bit-stream-utils.inline.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/buffer-builder.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer-builder.h b/src/parquet/util/buffer-builder.h
new file mode 100644
index 0000000..6fab6c5
--- /dev/null
+++ b/src/parquet/util/buffer-builder.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Initially imported from Impala on 2016-02-23
+
+#ifndef PARQUET_UTIL_BUFFER_BUILDER_H
+#define PARQUET_UTIL_BUFFER_BUILDER_H
+
+#include <stdlib.h>
+#include <cstdint>
+
+namespace parquet_cpp {
+
+/// Utility class to build an in-memory buffer.
+class BufferBuilder {
+ public:
+  BufferBuilder(uint8_t* dst_buffer, int dst_len)
+    : buffer_(dst_buffer), capacity_(dst_len), size_(0) {
+  }
+
+  BufferBuilder(char* dst_buffer, int dst_len)
+    : buffer_(reinterpret_cast<uint8_t*>(dst_buffer)),
+      capacity_(dst_len), size_(0) {
+  }
+
+  inline void Append(const void* buffer, int len) {
+    memcpy(buffer_ + size_, buffer, len);
+    size_ += len;
+  }
+
+  template<typename T>
+  inline void Append(const T& v) {
+    Append(&v, sizeof(T));
+  }
+
+  int capacity() const { return capacity_; }
+  int size() const { return size_; }
+
+ private:
+  uint8_t* buffer_;
+  int capacity_;
+  int size_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_BUFFER_BUILDER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/cpu-info.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/cpu-info.cc b/src/parquet/util/cpu-info.cc
index 610fb62..2a9f59d 100644
--- a/src/parquet/util/cpu-info.cc
+++ b/src/parquet/util/cpu-info.cc
@@ -24,16 +24,18 @@
 #include <sys/sysctl.h>
 #endif
 
+#include <mmintrin.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+
 #include <boost/algorithm/string.hpp>
+
 #include <algorithm>
 #include <cstdint>
 #include <iostream>
 #include <fstream>
-#include <mmintrin.h>
 #include <sstream>
-#include <stdlib.h>
-#include <string.h>
-#include <unistd.h>
 #include <string>
 
 #include "parquet/exception.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/dict-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/dict-encoding.h b/src/parquet/util/dict-encoding.h
new file mode 100644
index 0000000..315b88e
--- /dev/null
+++ b/src/parquet/util/dict-encoding.h
@@ -0,0 +1,36 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_DICT_ENCODING_H
+#define PARQUET_UTIL_DICT_ENCODING_H
+
+#include <algorithm>
+#include <cstdint>
+#include <limits>
+#include <vector>
+
+#include "parquet/types.h"
+#include "parquet/encodings/plain-encoding.h"
+#include "parquet/util/hash-util.h"
+#include "parquet/util/mem-pool.h"
+#include "parquet/util/rle-encoding.h"
+
+namespace parquet_cpp {
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_DICT_ENCODING_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/hash-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/hash-util.h b/src/parquet/util/hash-util.h
new file mode 100644
index 0000000..5572ca9
--- /dev/null
+++ b/src/parquet/util/hash-util.h
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// From Apache Impala as of 2016-02-22
+
+#ifndef PARQUET_UTIL_HASH_UTIL_H
+#define PARQUET_UTIL_HASH_UTIL_H
+
+#include <cstdint>
+
+#include "parquet/util/compiler-util.h"
+#include "parquet/util/cpu-info.h"
+#include "parquet/util/logging.h"
+#include "parquet/util/sse-util.h"
+
+namespace parquet_cpp {
+
+/// Utility class to compute hash values.
+class HashUtil {
+ public:
+  /// Compute the Crc32 hash for data using SSE4 instructions.  The input hash
+  /// parameter is the current hash/seed value.
+  /// This should only be called if SSE is supported.
+  /// This is ~4x faster than Fnv/Boost Hash.
+  /// TODO: crc32 hashes with different seeds do not result in different hash functions.
+  /// The resulting hashes are correlated.
+  /// TODO: update this to also use SSE4_crc32_u64 and SSE4_crc32_u16 where appropriate.
+  static uint32_t CrcHash(const void* data, int32_t bytes, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    uint32_t words = bytes / sizeof(uint32_t);
+    bytes = bytes % sizeof(uint32_t);
+
+    const uint32_t* p = reinterpret_cast<const uint32_t*>(data);
+    while (words--) {
+      hash = SSE4_crc32_u32(hash, *p);
+      ++p;
+    }
+
+    const uint8_t* s = reinterpret_cast<const uint8_t*>(p);
+    while (bytes--) {
+      hash = SSE4_crc32_u8(hash, *s);
+      ++s;
+    }
+
+    // The lower half of the CRC hash has has poor uniformity, so swap the halves
+    // for anyone who only uses the first several bits of the hash.
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 1-byte data
+  static inline uint32_t CrcHash1(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint8_t* s = reinterpret_cast<const uint8_t*>(v);
+    hash = SSE4_crc32_u8(hash, *s);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 2-byte data
+  static inline uint32_t CrcHash2(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint16_t* s = reinterpret_cast<const uint16_t*>(v);
+    hash = SSE4_crc32_u16(hash, *s);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 4-byte data
+  static inline uint32_t CrcHash4(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint32_t* p = reinterpret_cast<const uint32_t*>(v);
+    hash = SSE4_crc32_u32(hash, *p);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 8-byte data
+  static inline uint32_t CrcHash8(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint64_t* p = reinterpret_cast<const uint64_t*>(v);
+    hash = SSE4_crc32_u64(hash, *p);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 12-byte data
+  static inline uint32_t CrcHash12(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint64_t* p = reinterpret_cast<const uint64_t*>(v);
+    hash = SSE4_crc32_u64(hash, *p);
+    ++p;
+    hash = SSE4_crc32_u32(hash, *reinterpret_cast<const uint32_t *>(p));
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  /// CrcHash() specialized for 16-byte data
+  static inline uint32_t CrcHash16(const void* v, uint32_t hash) {
+    DCHECK(CpuInfo::IsSupported(CpuInfo::SSE4_2));
+    const uint64_t* p = reinterpret_cast<const uint64_t*>(v);
+    hash = SSE4_crc32_u64(hash, *p);
+    ++p;
+    hash = SSE4_crc32_u64(hash, *p);
+    hash = (hash << 16) | (hash >> 16);
+    return hash;
+  }
+
+  static const uint64_t MURMUR_PRIME = 0xc6a4a7935bd1e995;
+  static const int MURMUR_R = 47;
+
+  /// Murmur2 hash implementation returning 64-bit hashes.
+  static uint64_t MurmurHash2_64(const void* input, int len, uint64_t seed) {
+    uint64_t h = seed ^ (len * MURMUR_PRIME);
+
+    const uint64_t* data = reinterpret_cast<const uint64_t*>(input);
+    const uint64_t* end = data + (len / sizeof(uint64_t));
+
+    while (data != end) {
+      uint64_t k = *data++;
+      k *= MURMUR_PRIME;
+      k ^= k >> MURMUR_R;
+      k *= MURMUR_PRIME;
+      h ^= k;
+      h *= MURMUR_PRIME;
+    }
+
+    const uint8_t* data2 = reinterpret_cast<const uint8_t*>(data);
+    switch (len & 7) {
+      case 7: h ^= uint64_t(data2[6]) << 48;
+      case 6: h ^= uint64_t(data2[5]) << 40;
+      case 5: h ^= uint64_t(data2[4]) << 32;
+      case 4: h ^= uint64_t(data2[3]) << 24;
+      case 3: h ^= uint64_t(data2[2]) << 16;
+      case 2: h ^= uint64_t(data2[1]) << 8;
+      case 1: h ^= uint64_t(data2[0]);
+              h *= MURMUR_PRIME;
+    }
+
+    h ^= h >> MURMUR_R;
+    h *= MURMUR_PRIME;
+    h ^= h >> MURMUR_R;
+    return h;
+  }
+
+  /// default values recommended by http://isthe.com/chongo/tech/comp/fnv/
+  static const uint32_t FNV_PRIME = 0x01000193; //   16777619
+  static const uint32_t FNV_SEED = 0x811C9DC5; // 2166136261
+  static const uint64_t FNV64_PRIME = 1099511628211UL;
+  static const uint64_t FNV64_SEED = 14695981039346656037UL;
+
+  /// Implementation of the Fowler-Noll-Vo hash function. This is not as performant
+  /// as boost's hash on int types (2x slower) but has bit entropy.
+  /// For ints, boost just returns the value of the int which can be pathological.
+  /// For example, if the data is <1000, 2000, 3000, 4000, ..> and then the mod of 1000
+  /// is taken on the hash, all values will collide to the same bucket.
+  /// For string values, Fnv is slightly faster than boost.
+  /// IMPORTANT: FNV hash suffers from poor diffusion of the least significant bit,
+  /// which can lead to poor results when input bytes are duplicated.
+  /// See FnvHash64to32() for how this can be mitigated.
+  static uint64_t FnvHash64(const void* data, int32_t bytes, uint64_t hash) {
+    const uint8_t* ptr = reinterpret_cast<const uint8_t*>(data);
+    while (bytes--) {
+      hash = (*ptr ^ hash) * FNV64_PRIME;
+      ++ptr;
+    }
+    return hash;
+  }
+
+  /// Return a 32-bit hash computed by invoking FNV-64 and folding the result to 32-bits.
+  /// This technique is recommended instead of FNV-32 since the LSB of an FNV hash is the
+  /// XOR of the LSBs of its input bytes, leading to poor results for duplicate inputs.
+  /// The input seed 'hash' is duplicated so the top half of the seed is not all zero.
+  /// Data length must be at least 1 byte: zero-length data should be handled separately,
+  /// for example using CombineHash with a unique constant value to avoid returning the
+  /// hash argument. Zero-length data gives terrible results: the initial hash value is
+  /// xored with itself cancelling all bits.
+  static uint32_t FnvHash64to32(const void* data, int32_t bytes, uint32_t hash) {
+    // IMPALA-2270: this function should never be used for zero-byte inputs.
+    DCHECK_GT(bytes, 0);
+    uint64_t hash_u64 = hash | ((uint64_t)hash << 32);
+    hash_u64 = FnvHash64(data, bytes, hash_u64);
+    return (hash_u64 >> 32) ^ (hash_u64 & 0xFFFFFFFF);
+  }
+
+  /// Computes the hash value for data.  Will call either CrcHash or MurmurHash
+  /// depending on hardware capabilities.
+  /// Seed values for different steps of the query execution should use different seeds
+  /// to prevent accidental key collisions. (See IMPALA-219 for more details).
+  static uint32_t Hash(const void* data, int32_t bytes, uint32_t seed) {
+    if (LIKELY(CpuInfo::IsSupported(CpuInfo::SSE4_2))) {
+      return CrcHash(data, bytes, seed);
+    } else {
+      return MurmurHash2_64(data, bytes, seed);
+    }
+  }
+
+  /// The magic number (used in hash_combine()) 0x9e3779b9 = 2^32 / (golden ratio).
+  static const uint32_t HASH_COMBINE_SEED = 0x9e3779b9;
+
+  /// Combine hashes 'value' and 'seed' to get a new hash value.  Similar to
+  /// boost::hash_combine(), but for uint32_t. This function should be used with a
+  /// constant first argument to update the hash value for zero-length values such as
+  /// NULL, boolean, and empty strings.
+  static inline uint32_t HashCombine32(uint32_t value, uint32_t seed) {
+    return seed ^ (HASH_COMBINE_SEED + value + (seed << 6) + (seed >> 2));
+  }
+
+  // Get 32 more bits of randomness from a 32-bit hash:
+  static inline uint32_t Rehash32to32(const uint32_t hash) {
+    // Constants generated by uuidgen(1) with the -r flag
+    static const uint64_t m = 0x7850f11ec6d14889ull, a = 0x6773610597ca4c63ull;
+    // This is strongly universal hashing following Dietzfelbinger's "Universal hashing
+    // and k-wise independent random variables via integer arithmetic without primes". As
+    // such, for any two distinct uint32_t's hash1 and hash2, the probability (over the
+    // randomness of the constants) that any subset of bit positions of
+    // Rehash32to32(hash1) is equal to the same subset of bit positions
+    // Rehash32to32(hash2) is minimal.
+    return (static_cast<uint64_t>(hash) * m + a) >> 32;
+  }
+
+  static inline uint64_t Rehash32to64(const uint32_t hash) {
+    static const uint64_t m1 = 0x47b6137a44974d91ull, m2 = 0x8824ad5ba2b7289cull,
+                          a1 = 0x705495c62df1424aull, a2 = 0x9efc49475c6bfb31ull;
+    const uint64_t hash1 = (static_cast<uint64_t>(hash) * m1 + a1) >> 32;
+    const uint64_t hash2 = (static_cast<uint64_t>(hash) * m2 + a2) >> 32;
+    return hash1 | (hash2 << 32);
+  }
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_HASH_UTIL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c6e06929/src/parquet/util/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool-test.cc b/src/parquet/util/mem-pool-test.cc
new file mode 100644
index 0000000..de0b399
--- /dev/null
+++ b/src/parquet/util/mem-pool-test.cc
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// Initially imported from Apache Impala on 2016-02-23, and has been modified
+// since for parquet-cpp
+
+#include <gtest/gtest.h>
+#include <cstdint>
+#include <limits>
+#include <string>
+
+#include "parquet/util/mem-pool.h"
+#include "parquet/util/bit-util.h"
+
+namespace parquet_cpp {
+
+// Utility class to call private functions on MemPool.
+class MemPoolTest {
+ public:
+  static bool CheckIntegrity(MemPool* pool, bool current_chunk_empty) {
+    return pool->CheckIntegrity(current_chunk_empty);
+  }
+
+  static const int INITIAL_CHUNK_SIZE = MemPool::INITIAL_CHUNK_SIZE;
+  static const int MAX_CHUNK_SIZE = MemPool::MAX_CHUNK_SIZE;
+};
+
+const int MemPoolTest::INITIAL_CHUNK_SIZE;
+const int MemPoolTest::MAX_CHUNK_SIZE;
+
+TEST(MemPoolTest, Basic) {
+  MemPool p;
+  MemPool p2;
+  MemPool p3;
+
+  for (int iter = 0; iter < 2; ++iter) {
+    // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes)
+    for (int i = 0; i < 768; ++i) {
+      // pads to 32 bytes
+      p.Allocate(25);
+    }
+    // we handed back 24K
+    EXPECT_EQ(24 * 1024, p.total_allocated_bytes());
+    // .. and allocated 28K of chunks (4, 8, 16)
+    EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes());
+
+    // we're passing on the first two chunks, containing 12K of data; we're left with
+    // one chunk of 16K containing 12K of data
+    p2.AcquireData(&p, true);
+    EXPECT_EQ(12 * 1024, p.total_allocated_bytes());
+    EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes());
+
+    // we allocate 8K, for which there isn't enough room in the current chunk,
+    // so another one is allocated (32K)
+    p.Allocate(8 * 1024);
+    EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes());
+
+    // we allocate 65K, which doesn't fit into the current chunk or the default
+    // size of the next allocated chunk (64K)
+    p.Allocate(65 * 1024);
+    EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
+    if (iter == 0) {
+      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+    } else {
+      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    }
+    EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // Clear() resets allocated data, but doesn't remove any chunks
+    p.Clear();
+    EXPECT_EQ(0, p.total_allocated_bytes());
+    if (iter == 0) {
+      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+    } else {
+      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    }
+    EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // next allocation reuses existing chunks
+    p.Allocate(1024);
+    EXPECT_EQ(1024, p.total_allocated_bytes());
+    if (iter == 0) {
+      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+    } else {
+      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    }
+    EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // ... unless it doesn't fit into any available chunk
+    p.Allocate(120 * 1024);
+    EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
+    if (iter == 0) {
+      EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
+    } else {
+      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    }
+    EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // ... Try another chunk that fits into an existing chunk
+    p.Allocate(33 * 1024);
+    EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes());
+    EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // we're releasing 3 chunks, which get added to p2
+    p2.AcquireData(&p, false);
+    EXPECT_EQ(0, p.total_allocated_bytes());
+    EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    EXPECT_EQ(0, p.GetTotalChunkSizes());
+
+    p3.AcquireData(&p2, true);  // we're keeping the 65k chunk
+    EXPECT_EQ(33 * 1024, p2.total_allocated_bytes());
+    EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes());
+
+    p.FreeAll();
+    p2.FreeAll();
+    p3.FreeAll();
+  }
+}
+
+// Test that we can keep an allocated chunk and a free chunk.
+// This case verifies that when chunks are acquired by another memory pool the
+// remaining chunks are consistent if there were more than one used chunk and some
+// free chunks.
+TEST(MemPoolTest, Keep) {
+  MemPool p;
+  p.Allocate(4*1024);
+  p.Allocate(8*1024);
+  p.Allocate(16*1024);
+  EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes());
+  EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+  p.Clear();
+  EXPECT_EQ(0, p.total_allocated_bytes());
+  EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+  p.Allocate(1*1024);
+  p.Allocate(4*1024);
+  EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes());
+  EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+
+  MemPool p2;
+  p2.AcquireData(&p, true);
+  EXPECT_EQ(4 * 1024, p.total_allocated_bytes());
+  EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes());
+  EXPECT_EQ(1 * 1024, p2.total_allocated_bytes());
+  EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes());
+
+  p.FreeAll();
+  p2.FreeAll();
+}
+
+// Tests that we can return partial allocations.
+TEST(MemPoolTest, ReturnPartial) {
+  MemPool p;
+  uint8_t* ptr = p.Allocate(1024);
+  EXPECT_EQ(1024, p.total_allocated_bytes());
+  memset(ptr, 0, 1024);
+  p.ReturnPartialAllocation(1024);
+
+  uint8_t* ptr2 = p.Allocate(1024);
+  EXPECT_EQ(1024, p.total_allocated_bytes());
+  EXPECT_TRUE(ptr == ptr2);
+  p.ReturnPartialAllocation(1016);
+
+  ptr2 = p.Allocate(1016);
+  EXPECT_EQ(1024, p.total_allocated_bytes());
+  EXPECT_TRUE(ptr2 == ptr + 8);
+  p.ReturnPartialAllocation(512);
+  memset(ptr2, 1, 1016 - 512);
+
+  uint8_t* ptr3 = p.Allocate(512);
+  EXPECT_EQ(1024, p.total_allocated_bytes());
+  EXPECT_TRUE(ptr3 == ptr + 512);
+  memset(ptr3, 2, 512);
+
+  for (int i = 0; i < 8; ++i) {
+    EXPECT_EQ(0, ptr[i]);
+  }
+  for (int i = 8; i < 512; ++i) {
+    EXPECT_EQ(1, ptr[i]);
+  }
+  for (int i = 512; i < 1024; ++i) {
+    EXPECT_EQ(2, ptr[i]);
+  }
+
+  p.FreeAll();
+}
+
+// Test that the MemPool overhead is bounded when we make allocations of
+// INITIAL_CHUNK_SIZE.
+TEST(MemPoolTest, MemoryOverhead) {
+  MemPool p;
+  const int alloc_size = MemPoolTest::INITIAL_CHUNK_SIZE;
+  const int num_allocs = 1000;
+  int64_t total_allocated = 0;
+
+  for (int i = 0; i < num_allocs; ++i) {
+    uint8_t* mem = p.Allocate(alloc_size);
+    ASSERT_TRUE(mem != NULL);
+    total_allocated += alloc_size;
+
+    int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
+    // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most
+    // one empty chunk at the end.
+    EXPECT_LE(wasted_memory, MemPoolTest::MAX_CHUNK_SIZE);
+    // The chunk doubling algorithm should not allocate chunks larger than the total
+    // amount of memory already allocated.
+    EXPECT_LE(wasted_memory, total_allocated);
+  }
+
+  p.FreeAll();
+}
+
+// Test that the MemPool overhead is bounded when we make alternating large and small
+// allocations.
+TEST(MemPoolTest, FragmentationOverhead) {
+  MemPool p;
+  const int num_allocs = 100;
+  int64_t total_allocated = 0;
+
+  for (int i = 0; i < num_allocs; ++i) {
+    int alloc_size = i % 2 == 0 ? 1 : MemPoolTest::MAX_CHUNK_SIZE;
+    uint8_t* mem = p.Allocate(alloc_size);
+    ASSERT_TRUE(mem != NULL);
+    total_allocated += alloc_size;
+
+    int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
+    // Fragmentation should not waste more than half of each completed chunk.
+    EXPECT_LE(wasted_memory, total_allocated + MemPoolTest::MAX_CHUNK_SIZE);
+  }
+
+  p.FreeAll();
+}
+
+} // namespace parquet_cpp


Mime
View raw message