parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-820: Decoders should directly emit arrays with spacing for null entries
Date Wed, 18 Jan 2017 00:10:52 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 0804faf4f -> 65e7db196


PARQUET-820: Decoders should directly emit arrays with spacing for null entries

Old:

```
In [3]: import pyarrow.io as paio
   ...: import pyarrow.parquet as pq
   ...:
   ...: with open('yellow_tripdata_2016-01.parquet', 'r') as f:
   ...:     buf = f.read()
   ...: buf = paio.buffer_from_bytes(buf)
   ...:
   ...: def read_parquet():
   ...:   reader = paio.BufferReader(buf)
   ...:   df = pq.read_table(reader)
   ...:
   ...: %timeit read_parquet()
   ...:
1 loop, best of 3: 1.21 s per loop
```

New:

```
In [1]: import pyarrow.io as paio
   ...: import pyarrow.parquet as pq
   ...:
   ...: with open('yellow_tripdata_2016-01.parquet', 'r') as f:
   ...:     buf = f.read()
   ...: buf = paio.buffer_from_bytes(buf)
   ...:
   ...: def read_parquet():
   ...:   reader = paio.BufferReader(buf)
   ...:   df = pq.read_table(reader)
   ...:
   ...: %timeit read_parquet()
   ...:
1 loop, best of 3: 906 ms per loop
```

Arrow->Pandas conversion for comparison:

```
In [5]: %timeit df.to_pandas()
1 loop, best of 3: 567 ms per loop
```

All benchmarks were done on a single core CPU

I have to add a better test coverage before this can go in. There is still some room for future improvements that won't be done in this PR:
 * `DefinitionLevelsToBitmap` should be done in the DefinitionLevelsDecoder
 * `GetBatchWithDictSpaced` is something for a vectorization/bitmap ninja.

Author: Uwe L. Korn <uwelk@xhochy.com>
Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>

Closes #218 from xhochy/PARQUET-820 and squashes the following commits:

e6db697 [Korn, Uwe] Add INIT_BITSET macro
8f17db9 [Korn, Uwe] Use arrow::TypeTraits
8dcab1b [Uwe L. Korn] Adjust documentation for ReadBatchSpaced
798bc83 [Uwe L. Korn] Test ReadSpaced
9dc6dc0 [Uwe L. Korn] Test DecodeSpaced
ccb70dc [Uwe L. Korn] Add fast path for non-nullable-batches
6f99191 [Uwe L. Korn] Move bit reading into a macro
393d99a [Uwe L. Korn] Explicitly mark overrides
3424ae3 [Uwe L. Korn] Make more use of the bitmaps
685ad34 [Uwe L. Korn] Remove unused include
9b0f105 [Uwe L. Korn] Use bitset in the whole GetBatchWithDict loop
907c165 [Uwe L. Korn] Use bitset in literalbatch
0ec4b38 [Uwe L. Korn] Remove unused code
f6c4b5e [Uwe L. Korn] ninja format
cbf0176 [Uwe L. Korn] DecodeSpaced in dictionary encoder
3dfa43b [Uwe L. Korn] Directly read valid_bits
15aa324 [Uwe L. Korn] Only use ReadSpaced where needed
96dd347 [Korn, Uwe] PARQUET-820: Decoders should directly emit arrays with spacing for null entries


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/65e7db19
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/65e7db19
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/65e7db19

Branch: refs/heads/master
Commit: 65e7db1965a1117852df45904fcd21eb40c6d6b5
Parents: 0804faf
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Tue Jan 17 19:10:45 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Tue Jan 17 19:10:45 2017 -0500

----------------------------------------------------------------------
 src/parquet/arrow/arrow-reader-writer-test.cc |  17 +-
 src/parquet/arrow/reader.cc                   | 231 ++++++++++++++-------
 src/parquet/column/column-reader-test.cc      |  87 +++++++-
 src/parquet/column/column-writer-test.cc      |   3 +-
 src/parquet/column/reader.h                   | 112 ++++++++++
 src/parquet/column/scanner-test.cc            |   3 +-
 src/parquet/encodings/decoder.h               |  25 +++
 src/parquet/encodings/dictionary-encoding.h   |  12 +-
 src/parquet/encodings/encoding-test.cc        |  11 +-
 src/parquet/file/file-serialize-test.cc       |   3 +-
 src/parquet/thrift/util.h                     |   6 +-
 src/parquet/util/bit-util.h                   |  13 ++
 src/parquet/util/rle-encoding.h               |  80 +++++++
 src/parquet/util/test-common.h                |   3 +-
 14 files changed, 499 insertions(+), 107 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 2089abd..0418d42 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -205,9 +205,9 @@ class TestParquetIO : public ::testing::Test {
 
   void ReaderFromSink(std::unique_ptr<FileReader>* out) {
     std::shared_ptr<Buffer> buffer = sink_->GetBuffer();
-    ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
-            ::arrow::default_memory_pool(), ::parquet::default_reader_properties(),
-            nullptr, out));
+    ASSERT_OK_NO_THROW(
+        OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+            ::parquet::default_reader_properties(), nullptr, out));
   }
 
   void ReadSingleColumnFile(
@@ -268,8 +268,7 @@ class TestParquetIO : public ::testing::Test {
 typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
     ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
     ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
-    ::arrow::StringType, ::arrow::BinaryType>
-    TestTypes;
+    ::arrow::StringType, ::arrow::BinaryType> TestTypes;
 
 TYPED_TEST_CASE(TestParquetIO, TestTypes);
 
@@ -568,8 +567,8 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
  public:
   typedef typename c_type_trait<TestType>::ArrowCType T;
 
-  void MakeTestFile(std::vector<T>& values, int num_chunks,
-      std::unique_ptr<FileReader>* reader) {
+  void MakeTestFile(
+      std::vector<T>& values, int num_chunks, std::unique_ptr<FileReader>* reader) {
     std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
     std::unique_ptr<ParquetFileWriter> file_writer = this->MakeWriter(schema);
     size_t chunk_size = values.size() / num_chunks;
@@ -621,8 +620,8 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
 
 typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
     ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type,
-    ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, ::arrow::DoubleType>
-    PrimitiveTestTypes;
+    ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType,
+    ::arrow::DoubleType> PrimitiveTestTypes;
 
 TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index 7e14f56..c9f986a 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "parquet/arrow/schema.h"
+#include "parquet/util/bit-util.h"
 
 #include "arrow/api.h"
 #include "arrow/type_traits.h"
@@ -53,17 +54,7 @@ static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timest
 }
 
 template <typename ArrowType>
-struct ArrowTypeTraits {
-  typedef ::arrow::NumericArray<ArrowType> array_type;
-};
-
-template <>
-struct ArrowTypeTraits<::arrow::BooleanType> {
-  typedef ::arrow::BooleanArray array_type;
-};
-
-template <typename ArrowType>
-using ArrayType = typename ArrowTypeTraits<ArrowType>::array_type;
+using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
 
 class FileReader::Impl {
  public:
@@ -98,10 +89,11 @@ class FlatColumnReader::Impl {
   template <typename ArrowType>
   Status InitDataBuffer(int batch_size);
   template <typename ArrowType, typename ParquetType>
-  void ReadNullableFlatBatch(const int16_t* def_levels,
-      typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read);
+  Status ReadNullableFlatBatch(TypedColumnReader<ParquetType>* reader,
+      int16_t* def_levels, int64_t values_to_read, int64_t* levels_read);
   template <typename ArrowType, typename ParquetType>
-  void ReadNonNullableBatch(typename ParquetType::c_type* values, int64_t values_read);
+  Status ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
+      int64_t values_to_read, int64_t* levels_read);
 
  private:
   void NextRowGroup();
@@ -202,8 +194,8 @@ Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
 
 Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
     MemoryPool* allocator, std::unique_ptr<FileReader>* reader) {
-  return OpenFile(file, allocator, ::parquet::default_reader_properties(),
-      nullptr, reader);
+  return OpenFile(
+      file, allocator, ::parquet::default_reader_properties(), nullptr, reader);
 }
 
 Status FileReader::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out) {
@@ -244,83 +236,177 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
 }
 
 template <typename ArrowType, typename ParquetType>
-void FlatColumnReader::Impl::ReadNonNullableBatch(
-    typename ParquetType::c_type* values, int64_t values_read) {
+Status FlatColumnReader::Impl::ReadNonNullableBatch(
+    TypedColumnReader<ParquetType>* reader, int64_t values_to_read,
+    int64_t* levels_read) {
   using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+
+  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
+  auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
+  int64_t values_read;
+  PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(
+                           values_to_read, nullptr, nullptr, values, &values_read));
 
   ArrowCType* out_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
   std::copy(values, values + values_read, out_ptr + valid_bits_idx_);
   valid_bits_idx_ += values_read;
+
+  return Status::OK();
 }
 
+#define NONNULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)                 \
+  template <>                                                                      \
+  Status FlatColumnReader::Impl::ReadNonNullableBatch<ArrowType, ParquetType>(     \
+      TypedColumnReader<ParquetType> * reader, int64_t values_to_read,             \
+      int64_t * levels_read) {                                                     \
+    int64_t values_read;                                                           \
+    CType* out_ptr = reinterpret_cast<CType*>(data_buffer_ptr_);                   \
+    PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(values_to_read, nullptr, \
+                             nullptr, out_ptr + valid_bits_idx_, &values_read));   \
+                                                                                   \
+    valid_bits_idx_ += values_read;                                                \
+                                                                                   \
+    return Status::OK();                                                           \
+  }
+
+NONNULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
+NONNULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
+
 template <>
-void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
-    Int96* values, int64_t values_read) {
+Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::TimestampType, Int96Type>(
+    TypedColumnReader<Int96Type>* reader, int64_t values_to_read, int64_t* levels_read) {
+  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96Type), false));
+  auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
+  int64_t values_read;
+  PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(
+                           values_to_read, nullptr, nullptr, values, &values_read));
+
   int64_t* out_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
   for (int64_t i = 0; i < values_read; i++) {
-    out_ptr[i] = impala_timestamp_to_nanoseconds(values[i]);
+    *out_ptr++ = impala_timestamp_to_nanoseconds(values[i]);
   }
   valid_bits_idx_ += values_read;
+
+  return Status::OK();
 }
 
 template <>
-void FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
-    bool* values, int64_t values_read) {
+Status FlatColumnReader::Impl::ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
+    TypedColumnReader<BooleanType>* reader, int64_t values_to_read,
+    int64_t* levels_read) {
+  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
+  auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
+  int64_t values_read;
+  PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatch(
+                           values_to_read, nullptr, nullptr, values, &values_read));
+
   for (int64_t i = 0; i < values_read; i++) {
     if (values[i]) { ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_); }
     valid_bits_idx_++;
   }
+
+  return Status::OK();
 }
 
 template <typename ArrowType, typename ParquetType>
-void FlatColumnReader::Impl::ReadNullableFlatBatch(const int16_t* def_levels,
-    typename ParquetType::c_type* values, int64_t values_read, int64_t levels_read) {
+Status FlatColumnReader::Impl::ReadNullableFlatBatch(
+    TypedColumnReader<ParquetType>* reader, int16_t* def_levels, int64_t values_to_read,
+    int64_t* levels_read) {
   using ArrowCType = typename ArrowType::c_type;
+  using ParquetCType = typename ParquetType::c_type;
+
+  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
+  auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
+  int null_count;
+  PARQUET_CATCH_NOT_OK(*levels_read =
+                           reader->ReadBatchSpaced(values_to_read, def_levels, nullptr,
+                               values, &null_count, valid_bits_ptr_, valid_bits_idx_));
 
   auto data_ptr = reinterpret_cast<ArrowCType*>(data_buffer_ptr_);
-  int values_idx = 0;
-  for (int64_t i = 0; i < levels_read; i++) {
-    if (def_levels[i] < descr_->max_definition_level()) {
-      null_count_++;
-    } else {
-      ::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
-      data_ptr[valid_bits_idx_] = values[values_idx++];
-    }
-    valid_bits_idx_++;
+  INIT_BITSET(valid_bits_ptr_, valid_bits_idx_);
+
+  for (int64_t i = 0; i < *levels_read; i++) {
+    if (bitset & (1 << bit_offset)) { data_ptr[valid_bits_idx_ + i] = values[i]; }
+    READ_NEXT_BITSET(valid_bits_ptr_);
   }
+  null_count_ += null_count;
+  valid_bits_idx_ += *levels_read;
+
+  return Status::OK();
 }
 
+#define NULLABLE_BATCH_FAST_PATH(ArrowType, ParquetType, CType)                 \
+  template <>                                                                   \
+  Status FlatColumnReader::Impl::ReadNullableFlatBatch<ArrowType, ParquetType>( \
+      TypedColumnReader<ParquetType> * reader, int16_t * def_levels,            \
+      int64_t values_to_read, int64_t * levels_read) {                          \
+    auto data_ptr = reinterpret_cast<CType*>(data_buffer_ptr_);                 \
+    int null_count;                                                             \
+    PARQUET_CATCH_NOT_OK(*levels_read = reader->ReadBatchSpaced(values_to_read, \
+                             def_levels, nullptr, data_ptr + valid_bits_idx_,   \
+                             &null_count, valid_bits_ptr_, valid_bits_idx_));   \
+                                                                                \
+    valid_bits_idx_ += *levels_read;                                            \
+    null_count_ += null_count;                                                  \
+                                                                                \
+    return Status::OK();                                                        \
+  }
+
+NULLABLE_BATCH_FAST_PATH(::arrow::Int32Type, Int32Type, int32_t)
+NULLABLE_BATCH_FAST_PATH(::arrow::Int64Type, Int64Type, int64_t)
+NULLABLE_BATCH_FAST_PATH(::arrow::FloatType, FloatType, float)
+NULLABLE_BATCH_FAST_PATH(::arrow::DoubleType, DoubleType, double)
+
 template <>
-void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::TimestampType, Int96Type>(
-    const int16_t* def_levels, Int96* values, int64_t values_read, int64_t levels_read) {
+Status FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::TimestampType, Int96Type>(
+    TypedColumnReader<Int96Type>* reader, int16_t* def_levels, int64_t values_to_read,
+    int64_t* levels_read) {
+  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(Int96Type), false));
+  auto values = reinterpret_cast<Int96*>(values_buffer_.mutable_data());
+  int null_count;
+  PARQUET_CATCH_NOT_OK(*levels_read =
+                           reader->ReadBatchSpaced(values_to_read, def_levels, nullptr,
+                               values, &null_count, valid_bits_ptr_, valid_bits_idx_));
+
   auto data_ptr = reinterpret_cast<int64_t*>(data_buffer_ptr_);
-  int values_idx = 0;
-  for (int64_t i = 0; i < levels_read; i++) {
-    if (def_levels[i] < descr_->max_definition_level()) {
-      null_count_++;
-    } else {
-      ::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
-      data_ptr[valid_bits_idx_] = impala_timestamp_to_nanoseconds(values[values_idx++]);
+  INIT_BITSET(valid_bits_ptr_, valid_bits_idx_);
+  for (int64_t i = 0; i < *levels_read; i++) {
+    if (bitset & (1 << bit_offset)) {
+      data_ptr[valid_bits_idx_ + i] = impala_timestamp_to_nanoseconds(values[i]);
     }
-    valid_bits_idx_++;
+    READ_NEXT_BITSET(valid_bits_ptr_);
   }
+  null_count_ += null_count;
+  valid_bits_idx_ += *levels_read;
+
+  return Status::OK();
 }
 
 template <>
-void FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
-    const int16_t* def_levels, bool* values, int64_t values_read, int64_t levels_read) {
-  int values_idx = 0;
-  for (int64_t i = 0; i < levels_read; i++) {
-    if (def_levels[i] < descr_->max_definition_level()) {
-      null_count_++;
-    } else {
-      ::arrow::BitUtil::SetBit(valid_bits_ptr_, valid_bits_idx_);
-      if (values[values_idx++]) {
-        ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_);
-      }
+Status FlatColumnReader::Impl::ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
+    TypedColumnReader<BooleanType>* reader, int16_t* def_levels, int64_t values_to_read,
+    int64_t* levels_read) {
+  RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
+  auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
+  int null_count;
+  PARQUET_CATCH_NOT_OK(*levels_read =
+                           reader->ReadBatchSpaced(values_to_read, def_levels, nullptr,
+                               values, &null_count, valid_bits_ptr_, valid_bits_idx_));
+
+  INIT_BITSET(valid_bits_ptr_, valid_bits_idx_);
+  for (int64_t i = 0; i < *levels_read; i++) {
+    if (bitset & (1 << bit_offset)) {
+      if (values[i]) { ::arrow::BitUtil::SetBit(data_buffer_ptr_, valid_bits_idx_ + i); }
     }
-    valid_bits_idx_++;
+    READ_NEXT_BITSET(valid_bits_ptr_);
   }
+  valid_bits_idx_ += *levels_read;
+  null_count_ += null_count;
+
+  return Status::OK();
 }
 
 template <typename ArrowType>
@@ -347,13 +433,12 @@ template <typename ArrowType, typename ParquetType>
 Status FlatColumnReader::Impl::TypedReadBatch(
     int batch_size, std::shared_ptr<Array>* out) {
   using ArrowCType = typename ArrowType::c_type;
-  using ParquetCType = typename ParquetType::c_type;
 
   int values_to_read = batch_size;
   RETURN_NOT_OK(InitDataBuffer<ArrowType>(batch_size));
   valid_bits_idx_ = 0;
   if (descr_->max_definition_level() > 0) {
-    int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8;
+    int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size + 1) / 8;
     valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
     RETURN_NOT_OK(valid_bits_buffer_->Resize(valid_bits_size, false));
     valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
@@ -362,26 +447,22 @@ Status FlatColumnReader::Impl::TypedReadBatch(
   }
 
   while ((values_to_read > 0) && column_reader_) {
-    RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(ParquetCType), false));
     if (descr_->max_definition_level() > 0) {
       RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false));
     }
     auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
-    int64_t values_read;
     int64_t levels_read;
     int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-    auto values = reinterpret_cast<ParquetCType*>(values_buffer_.mutable_data());
-    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
-                             values_to_read, def_levels, nullptr, values, &values_read));
-    values_to_read -= levels_read;
     if (descr_->max_definition_level() == 0) {
-      ReadNonNullableBatch<ArrowType, ParquetType>(values, values_read);
+      RETURN_NOT_OK((ReadNonNullableBatch<ArrowType, ParquetType>(
+          reader, values_to_read, &levels_read)));
     } else {
       // As per the defintion and checks for flat columns:
       // descr_->max_definition_level() == 1
-      ReadNullableFlatBatch<ArrowType, ParquetType>(
-          def_levels, values, values_read, levels_read);
+      RETURN_NOT_OK((ReadNullableFlatBatch<ArrowType, ParquetType>(
+          reader, def_levels, values_to_read, &levels_read)));
     }
+    values_to_read -= levels_read;
     if (!column_reader_->HasNext()) { NextRowGroup(); }
   }
 
@@ -427,7 +508,7 @@ Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>
   valid_bits_idx_ = 0;
   if (descr_->max_definition_level() > 0) {
     valid_bits_buffer_ = std::make_shared<PoolBuffer>(pool_);
-    int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size) / 8;
+    int valid_bits_size = ::arrow::BitUtil::CeilByte(batch_size + 1) / 8;
     RETURN_NOT_OK(valid_bits_buffer_->Resize(valid_bits_size, false));
     valid_bits_ptr_ = valid_bits_buffer_->mutable_data();
     memset(valid_bits_ptr_, 0, valid_bits_size);
@@ -435,26 +516,22 @@ Status FlatColumnReader::Impl::TypedReadBatch<::arrow::BooleanType, BooleanType>
   }
 
   while ((values_to_read > 0) && column_reader_) {
-    RETURN_NOT_OK(values_buffer_.Resize(values_to_read * sizeof(bool), false));
     if (descr_->max_definition_level() > 0) {
       RETURN_NOT_OK(def_levels_buffer_.Resize(values_to_read * sizeof(int16_t), false));
     }
     auto reader = dynamic_cast<TypedColumnReader<BooleanType>*>(column_reader_.get());
-    int64_t values_read;
     int64_t levels_read;
     int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-    auto values = reinterpret_cast<bool*>(values_buffer_.mutable_data());
-    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
-                             values_to_read, def_levels, nullptr, values, &values_read));
-    values_to_read -= levels_read;
     if (descr_->max_definition_level() == 0) {
-      ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(values, values_read);
+      RETURN_NOT_OK((ReadNonNullableBatch<::arrow::BooleanType, BooleanType>(
+          reader, values_to_read, &levels_read)));
     } else {
       // As per the defintion and checks for flat columns:
       // descr_->max_definition_level() == 1
-      ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
-          def_levels, values, values_read, levels_read);
+      RETURN_NOT_OK((ReadNullableFlatBatch<::arrow::BooleanType, BooleanType>(
+          reader, def_levels, values_to_read, &levels_read)));
     }
+    values_to_read -= levels_read;
     if (!column_reader_->HasNext()) { NextRowGroup(); }
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 5b27b73..5cf3084 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -43,6 +43,23 @@ using schema::NodePtr;
 
 namespace test {
 
+template <typename T>
+static inline bool vector_equal_with_def_levels(const vector<T>& left,
+    const vector<int16_t> def_levels, int16_t max_def_levels, const vector<T>& right) {
+  size_t i_left = 0;
+  for (size_t i = 0; i < right.size(); ++i) {
+    if (def_levels[i] != max_def_levels) { continue; }
+    if (left[i_left] != right[i]) {
+      std::cerr << "index " << i << " left was " << left[i_left] << " right was "
+                << right[i] << std::endl;
+      return false;
+    }
+    i_left++;
+  }
+
+  return true;
+}
+
 class TestPrimitiveReader : public ::testing::Test {
  public:
   void InitReader(const ColumnDescriptor* d) {
@@ -84,17 +101,69 @@ class TestPrimitiveReader : public ::testing::Test {
     ASSERT_EQ(0, values_read);
   }
 
+  void CheckResultsSpaced() {
+    vector<int32_t> vresult(num_levels_, -1);
+    vector<int16_t> dresult(num_levels_, -1);
+    vector<int16_t> rresult(num_levels_, -1);
+    vector<uint8_t> valid_bits(num_levels_, 255);
+    int total_values_read = 0;
+    int batch_actual = 0;
+    int null_count = -1;
+
+    Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+    int32_t batch_size = 8;
+    int batch = 0;
+    // This will cover both the cases
+    // 1) batch_size < page_size (multiple ReadBatch from a single page)
+    // 2) batch_size > page_size (BatchRead limits to a single page)
+    do {
+      batch = reader->ReadBatchSpaced(batch_size, dresult.data() + batch_actual,
+          rresult.data() + batch_actual, vresult.data() + batch_actual, &null_count,
+          valid_bits.data() + batch_actual, 0);
+      total_values_read += batch - null_count;
+      batch_actual += batch;
+      batch_size = std::max(batch_size * 2, 4096);
+    } while (batch > 0);
+
+    if (max_def_level_ > 0) {
+      ASSERT_TRUE(vector_equal(def_levels_, dresult));
+      ASSERT_TRUE(
+          vector_equal_with_def_levels(values_, dresult, max_def_level_, vresult));
+    } else {
+      ASSERT_TRUE(vector_equal(values_, vresult));
+    }
+    if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
+    ASSERT_EQ(num_levels_, batch_actual);
+    ASSERT_EQ(num_values_, total_values_read);
+    // catch improper writes at EOS
+    batch_actual = reader->ReadBatchSpaced(
+        5, nullptr, nullptr, nullptr, &null_count, valid_bits.data(), 0);
+    ASSERT_EQ(0, batch_actual);
+    ASSERT_EQ(0, null_count);
+  }
+
+  void Clear() {
+    values_.clear();
+    def_levels_.clear();
+    rep_levels_.clear();
+    pages_.clear();
+    reader_.reset();
+  }
+
   void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
     num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
         rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
     num_levels_ = num_pages * levels_per_page;
     InitReader(d);
     CheckResults();
-    values_.clear();
-    def_levels_.clear();
-    rep_levels_.clear();
-    pages_.clear();
-    reader_.reset();
+    Clear();
+
+    num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+        rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
+    num_levels_ = num_pages * levels_per_page;
+    InitReader(d);
+    CheckResultsSpaced();
+    Clear();
   }
 
   void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
@@ -103,6 +172,14 @@ class TestPrimitiveReader : public ::testing::Test {
     num_levels_ = num_pages * levels_per_page;
     InitReader(d);
     CheckResults();
+    Clear();
+
+    num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+        rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
+    num_levels_ = num_pages * levels_per_page;
+    InitReader(d);
+    CheckResultsSpaced();
+    Clear();
   }
 
  protected:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index 5430005..aac1a94 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -216,8 +216,7 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compressio
 }
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    BooleanType, ByteArrayType, FLBAType>
-    TestTypes;
+    BooleanType, ByteArrayType, FLBAType> TestTypes;
 
 TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index bf567d9..3b6a971 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -129,6 +129,27 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
   int64_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
       int64_t* values_read);
 
+  /// Read a batch of repetition levels, definition levels, and values from the
+  /// column and leave spaces for null entries in the values buffer.
+  ///
+  /// In comparision to ReadBatch the length of repetition and definition levels
+  /// is the same as of the number of values read.
+  ///
+  /// To fully exhaust a row group, you must read batches until the number of
+  /// values read reaches the number of stored values according to the metadata.
+  ///
+  /// @param valid_bits Memory allocated for a bitmap that indicates if
+  ///   the row is null or on the maximum definition level. For performance
+  ///   reasons the underlying buffer should be able to store 1 bit more than
+  ///   required. If this requires an additional byte, this byte is only read
+  ///   but never written to.
+  /// @param valid_bits_offset The offset in bits of the valid_bits where the
+  ///  first relevant bit resides.
+  ///
+  /// @return actual number of levels read
+  int64_t ReadBatchSpaced(int batch_size, int16_t* def_levels, int16_t* rep_levels,
+      T* values, int* null_count, uint8_t* valid_bits, int64_t valid_bits_offset);
+
   // Skip reading levels
   // Returns the number of levels skipped
   int64_t Skip(int64_t num_rows_to_skip);
@@ -145,6 +166,14 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
   // @returns: the number of values read into the out buffer
   int64_t ReadValues(int64_t batch_size, T* out);
 
+  // Read up to batch_size values from the current data page into the
+  // pre-allocated memory T*, leaving spaces for null entries according
+  // to the def_levels.
+  //
+  // @returns: the number of values read into the out buffer
+  int64_t ReadValuesSpaced(int64_t batch_size, T* out, int null_count,
+      uint8_t* valid_bits, int64_t valid_bits_offset);
+
   // Map of encoding type to the respective decoder object. For example, a
   // column chunk's data pages may include both dictionary-encoded and
   // plain-encoded data.
@@ -162,6 +191,13 @@ inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out)
 }
 
 template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out,
+    int null_count, uint8_t* valid_bits, int64_t valid_bits_offset) {
+  return current_decoder_->DecodeSpaced(
+      out, batch_size, null_count, valid_bits, valid_bits_offset);
+}
+
+template <typename DType>
 inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_levels,
     int16_t* rep_levels, T* values, int64_t* values_read) {
   // HasNext invokes ReadNewPage
@@ -207,6 +243,82 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_
   return total_values;
 }
 
+inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
+    int16_t max_definition_level, int* null_count, uint8_t* valid_bits,
+    int64_t valid_bits_offset) {
+  int byte_offset = valid_bits_offset / 8;
+  int bit_offset = valid_bits_offset % 8;
+  uint8_t bitset = valid_bits[byte_offset];
+
+  for (int i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level) {
+      bitset |= (1 << bit_offset);
+    } else {
+      bitset &= ~(1 << bit_offset);
+      *null_count += 1;
+    }
+
+    bit_offset++;
+    if (bit_offset == 8) {
+      bit_offset = 0;
+      valid_bits[byte_offset] = bitset;
+      byte_offset++;
+      // TODO: Except for the last byte, this shouldn't be needed
+      bitset = valid_bits[byte_offset];
+    }
+  }
+  if (bit_offset != 0) { valid_bits[byte_offset] = bitset; }
+}
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int batch_size,
+    int16_t* def_levels, int16_t* rep_levels, T* values, int* null_count_out,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
+  // HasNext invokes ReadNewPage
+  if (!HasNext()) {
+    *null_count_out = 0;
+    return 0;
+  }
+
+  int64_t total_values;
+  // TODO(wesm): keep reading data pages until batch_size is reached, or the
+  // row group is finished
+  batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
+
+  // If the field is required and non-repeated, there are no definition levels
+  if (descr_->max_definition_level() > 0) {
+    int64_t num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
+
+    // Not present for non-repeated fields
+    if (descr_->max_repetition_level() > 0) {
+      int64_t num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
+      if (num_def_levels != num_rep_levels) {
+        throw ParquetException("Number of decoded rep / def levels did not match");
+      }
+    }
+
+    // TODO: Move this into the DefinitionLevels reader
+    int null_count = 0;
+    int16_t max_definition_level = descr_->max_definition_level();
+    DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
+        &null_count, valid_bits, valid_bits_offset);
+    *null_count_out = null_count;
+
+    total_values = ReadValuesSpaced(
+        num_def_levels, values, null_count, valid_bits, valid_bits_offset);
+  } else {
+    // Required field, read all values
+    total_values = ReadValues(batch_size, values);
+    for (int64_t i = 0; i < total_values; i++) {
+      ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
+    }
+    *null_count_out = 0;
+  }
+
+  num_decoded_values_ += total_values;
+  return total_values;
+}
+
 template <typename DType>
 inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
   int64_t rows_to_skip = num_rows_to_skip;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/column/scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc
index 8eee191..3ac07dc 100644
--- a/src/parquet/column/scanner-test.cc
+++ b/src/parquet/column/scanner-test.cc
@@ -146,8 +146,7 @@ static int num_pages = 20;
 static int batch_size = 32;
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    ByteArrayType>
-    TestTypes;
+    ByteArrayType> TestTypes;
 
 using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
 using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/encodings/decoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/decoder.h b/src/parquet/encodings/decoder.h
index 1ac9f35..58989e5 100644
--- a/src/parquet/encodings/decoder.h
+++ b/src/parquet/encodings/decoder.h
@@ -20,6 +20,8 @@
 
 #include <cstdint>
 
+#include <arrow/util/bit-util.h>
+
 #include "parquet/exception.h"
 #include "parquet/types.h"
 #include "parquet/util/memory.h"
@@ -48,6 +50,29 @@ class Decoder {
     throw ParquetException("Decoder does not implement this type.");
   }
 
+  // Decode the values in this data page but leave spaces for null entries.
+  //
+  // num_values is the size of the def_levels and buffer arrays including the number of
+  // null values.
+  virtual int DecodeSpaced(T* buffer, int num_values, int null_count,
+      const uint8_t* valid_bits, int64_t valid_bits_offset) {
+    int values_to_read = num_values - null_count;
+    int values_read = Decode(buffer, values_to_read);
+    if (values_read != values_to_read) {
+      throw ParquetException("Number of values / definition_levels read did not match");
+    }
+
+    // Add spacing for null entries. As we have filled the buffer from the front,
+    // we need to add the spacing from the back.
+    int values_to_move = values_read;
+    for (int i = num_values - 1; i >= 0; i--) {
+      if (::arrow::BitUtil::GetBit(valid_bits, valid_bits_offset + i)) {
+        buffer[i] = buffer[--values_to_move];
+      }
+    }
+    return num_values;
+  }
+
   // Returns the number of values left (for the last call to SetData()). This is
   // the number of values left in this page.
   int values_left() const { return num_values_; }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index b79744a..191bded 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -51,7 +51,7 @@ class DictionaryDecoder : public Decoder<Type> {
   // Perform type-specific initiatialization
   void SetDict(Decoder<Type>* dictionary);
 
-  virtual void SetData(int num_values, const uint8_t* data, int len) {
+  void SetData(int num_values, const uint8_t* data, int len) override {
     num_values_ = num_values;
     if (len == 0) return;
     uint8_t bit_width = *data;
@@ -60,7 +60,7 @@ class DictionaryDecoder : public Decoder<Type> {
     idx_decoder_ = RleDecoder(data, len, bit_width);
   }
 
-  virtual int Decode(T* buffer, int max_values) {
+  int Decode(T* buffer, int max_values) override {
     max_values = std::min(max_values, num_values_);
     int decoded_values = idx_decoder_.GetBatchWithDict(dictionary_, buffer, max_values);
     if (decoded_values != max_values) { ParquetException::EofException(); }
@@ -68,6 +68,14 @@ class DictionaryDecoder : public Decoder<Type> {
     return max_values;
   }
 
+  int DecodeSpaced(T* buffer, int num_values, int null_count, const uint8_t* valid_bits,
+      int64_t valid_bits_offset) override {
+    int decoded_values = idx_decoder_.GetBatchWithDictSpaced(
+        dictionary_, buffer, num_values, null_count, valid_bits, valid_bits_offset);
+    if (decoded_values != num_values) { ParquetException::EofException(); }
+    return decoded_values;
+  }
+
  private:
   using Decoder<Type>::num_values_;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
index eccfc5d..999cad9 100644
--- a/src/parquet/encodings/encoding-test.cc
+++ b/src/parquet/encodings/encoding-test.cc
@@ -237,8 +237,7 @@ TYPED_TEST(TestPlainEncoding, BasicRoundTrip) {
 // Dictionary encoding tests
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    ByteArrayType, FLBAType>
-    DictEncodedTypes;
+    ByteArrayType, FLBAType> DictEncodedTypes;
 
 template <typename Type>
 class TestDictionaryEncoding : public TestEncodingBase<Type> {
@@ -271,6 +270,14 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
     // values' data is owned by a buffer inside the DictionaryEncoder. We
     // should revisit when data lifetime is reviewed more generally.
     VerifyResults<T>(decode_buf_, draws_, num_values_);
+
+    // Also test spaced decoding
+    decoder.SetData(num_values_, indices->data(), indices->size());
+    std::vector<uint8_t> valid_bits(BitUtil::RoundUpNumBytes(num_values_) + 1, 255);
+    values_decoded =
+        decoder.DecodeSpaced(decode_buf_, num_values_, 0, valid_bits.data(), 0);
+    ASSERT_EQ(num_values_, values_decoded);
+    VerifyResults<T>(decode_buf_, draws_, num_values_);
   }
 
  protected:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 7a90eeb..71dd5c4 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -106,8 +106,7 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
 };
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    BooleanType, ByteArrayType, FLBAType>
-    TestTypes;
+    BooleanType, ByteArrayType, FLBAType> TestTypes;
 
 TYPED_TEST_CASE(TestSerialize, TestTypes);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/thrift/util.h
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h
index 9d2b66f..92ea82c 100644
--- a/src/parquet/thrift/util.h
+++ b/src/parquet/thrift/util.h
@@ -99,8 +99,7 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T* deseriali
   boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> tmem_transport(
       new apache::thrift::transport::TMemoryBuffer(const_cast<uint8_t*>(buf), *len));
   apache::thrift::protocol::TCompactProtocolFactoryT<
-      apache::thrift::transport::TMemoryBuffer>
-      tproto_factory;
+      apache::thrift::transport::TMemoryBuffer> tproto_factory;
   boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
       tproto_factory.getProtocol(tmem_transport);
   try {
@@ -122,8 +121,7 @@ inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
   boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
       new apache::thrift::transport::TMemoryBuffer(len));
   apache::thrift::protocol::TCompactProtocolFactoryT<
-      apache::thrift::transport::TMemoryBuffer>
-      tproto_factory;
+      apache::thrift::transport::TMemoryBuffer> tproto_factory;
   boost::shared_ptr<apache::thrift::protocol::TProtocol> tproto =
       tproto_factory.getProtocol(mem_buffer);
   try {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/util/bit-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h
index b75f5a1..4d85864 100644
--- a/src/parquet/util/bit-util.h
+++ b/src/parquet/util/bit-util.h
@@ -34,6 +34,19 @@
 
 namespace parquet {
 
+#define INIT_BITSET(valid_bits_vector, valid_bits_index) \
+  int byte_offset = valid_bits_index / 8; \
+  int bit_offset = valid_bits_index % 8; \
+  uint8_t bitset = valid_bits_vector[byte_offset];
+
+#define READ_NEXT_BITSET(valid_bits_vector)  \
+  bit_offset++;                              \
+  if (bit_offset == 8) {                     \
+    bit_offset = 0;                          \
+    byte_offset++;                           \
+    bitset = valid_bits_vector[byte_offset]; \
+  }
+
 // TODO(wesm): The source from Impala was depending on boost::make_unsigned
 //
 // We add a partial stub implementation here

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/rle-encoding.h b/src/parquet/util/rle-encoding.h
index d4be1fc..9b2bf56 100644
--- a/src/parquet/util/rle-encoding.h
+++ b/src/parquet/util/rle-encoding.h
@@ -119,6 +119,11 @@ class RleDecoder {
   template <typename T>
   int GetBatchWithDict(const Vector<T>& dictionary, T* values, int batch_size);
 
+  /// Like GetBatchWithDict but add spacing for null entries
+  template <typename T>
+  int GetBatchWithDictSpaced(const Vector<T>& dictionary, T* values, int batch_size,
+      int null_count, const uint8_t* valid_bits, int64_t valid_bits_offset);
+
  protected:
   BitReader bit_reader_;
   /// Number of bits needed to encode the value. Must be between 0 and 64.
@@ -338,6 +343,81 @@ inline int RleDecoder::GetBatchWithDict(
 }
 
 template <typename T>
+inline int RleDecoder::GetBatchWithDictSpaced(const Vector<T>& dictionary, T* values,
+    int batch_size, int null_count, const uint8_t* valid_bits,
+    int64_t valid_bits_offset) {
+  DCHECK_GE(bit_width_, 0);
+  int values_read = 0;
+  int remaining_nulls = null_count;
+  INIT_BITSET(valid_bits, valid_bits_offset);
+
+  while (values_read < batch_size) {
+    bool is_valid = (bitset & (1 << bit_offset));
+    READ_NEXT_BITSET(valid_bits);
+
+    if (is_valid) {
+      if ((repeat_count_ == 0) && (literal_count_ == 0)) {
+        if (!NextCounts<T>()) return values_read;
+      }
+      if (repeat_count_ > 0) {
+        T value = dictionary[current_value_];
+        // The current index is already valid, we don't need to check that again
+        int repeat_batch = 1;
+        repeat_count_--;
+
+        while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) {
+          if (bitset & (1 << bit_offset)) {
+            repeat_count_--;
+          } else {
+            remaining_nulls--;
+          }
+          repeat_batch++;
+
+          READ_NEXT_BITSET(valid_bits);
+        }
+        std::fill(values + values_read, values + values_read + repeat_batch, value);
+        values_read += repeat_batch;
+      } else if (literal_count_ > 0) {
+        int literal_batch = std::min(
+            batch_size - values_read - remaining_nulls, static_cast<int>(literal_count_));
+
+        // Decode the literals
+        constexpr int kBufferSize = 1024;
+        int indices[kBufferSize];
+        literal_batch = std::min(literal_batch, kBufferSize);
+        int actual_read = bit_reader_.GetBatch(bit_width_, &indices[0], literal_batch);
+        DCHECK_EQ(actual_read, literal_batch);
+
+        int skipped = 0;
+        int literals_read = 1;
+        values[values_read] = dictionary[indices[0]];
+
+        // Read the first bitset to the end
+        while (literals_read < literal_batch) {
+          if (bitset & (1 << bit_offset)) {
+            values[values_read + literals_read + skipped] =
+                dictionary[indices[literals_read]];
+            literals_read++;
+          } else {
+            skipped++;
+          }
+
+          READ_NEXT_BITSET(valid_bits);
+        }
+        literal_count_ -= literal_batch;
+        values_read += literal_batch + skipped;
+        remaining_nulls -= skipped;
+      }
+    } else {
+      values_read++;
+      remaining_nulls--;
+    }
+  }
+
+  return values_read;
+}
+
+template <typename T>
 bool RleDecoder::NextCounts() {
   // Read the next run's indicator int, it could be a literal or repeated run.
   // The int is encoded as a vlq-encoded value.

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/65e7db19/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
index 2327aeb..edadb53 100644
--- a/src/parquet/util/test-common.h
+++ b/src/parquet/util/test-common.h
@@ -32,8 +32,7 @@ namespace parquet {
 namespace test {
 
 typedef ::testing::Types<BooleanType, Int32Type, Int64Type, Int96Type, FloatType,
-    DoubleType, ByteArrayType, FLBAType>
-    ParquetTypes;
+    DoubleType, ByteArrayType, FLBAType> ParquetTypes;
 
 template <typename T>
 static inline void assert_vector_equal(const vector<T>& left, const vector<T>& right) {


Mime
View raw message