parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject [1/2] parquet-cpp git commit: PARQUET-834: Support I/O of arrow::ListArray
Date Thu, 02 Feb 2017 21:14:38 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master b1c85caf9 -> ad56e7aea


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 993ff67..7556313 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -26,11 +26,18 @@
 #include "parquet/arrow/schema.h"
 
 #include "arrow/api.h"
+#include "arrow/type_traits.h"
 
+using arrow::Array;
 using arrow::BinaryArray;
+using arrow::BooleanArray;
+using arrow::Int16Array;
+using arrow::Int16Builder;
+using arrow::Field;
 using arrow::MemoryPool;
 using arrow::PoolBuffer;
 using arrow::PrimitiveArray;
+using arrow::ListArray;
 using arrow::Status;
 using arrow::Table;
 
@@ -43,19 +50,215 @@ namespace arrow {
 
 namespace BitUtil = ::arrow::BitUtil;
 
+class LevelBuilder : public ::arrow::ArrayVisitor {
+ public:
+  explicit LevelBuilder(MemoryPool* pool)
+      : def_levels_(pool, ::arrow::int16()), rep_levels_(pool, ::arrow::int16()) {
+    def_levels_buffer_ = std::make_shared<PoolBuffer>(pool);
+  }
+
+#define PRIMITIVE_VISIT(ArrowTypePrefix)                                \
+  Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \
+    valid_bitmaps_.push_back(array.null_bitmap_data());                 \
+    null_counts_.push_back(array.null_count());                         \
+    values_type_ = array.type_enum();                                   \
+    values_array_ = &array;                                             \
+    return Status::OK();                                                \
+  }
+
+  PRIMITIVE_VISIT(Boolean)
+  PRIMITIVE_VISIT(Int8)
+  PRIMITIVE_VISIT(Int16)
+  PRIMITIVE_VISIT(Int32)
+  PRIMITIVE_VISIT(Int64)
+  PRIMITIVE_VISIT(UInt8)
+  PRIMITIVE_VISIT(UInt16)
+  PRIMITIVE_VISIT(UInt32)
+  PRIMITIVE_VISIT(UInt64)
+  PRIMITIVE_VISIT(HalfFloat)
+  PRIMITIVE_VISIT(Float)
+  PRIMITIVE_VISIT(Double)
+  PRIMITIVE_VISIT(String)
+  PRIMITIVE_VISIT(Binary)
+  PRIMITIVE_VISIT(Date)
+  PRIMITIVE_VISIT(Time)
+  PRIMITIVE_VISIT(Timestamp)
+  PRIMITIVE_VISIT(Interval)
+
+  Status Visit(const ListArray& array) override {
+    valid_bitmaps_.push_back(array.null_bitmap_data());
+    null_counts_.push_back(array.null_count());
+    offsets_.push_back(array.raw_offsets());
+
+    min_offset_idx_ = array.raw_offsets()[min_offset_idx_];
+    max_offset_idx_ = array.raw_offsets()[max_offset_idx_];
+
+    return array.values()->Accept(this);
+  }
+
+#define NOT_IMPLEMENTED_VIST(ArrowTypePrefix)                           \
+  Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \
+    return Status::NotImplemented(                                      \
+        "Level generation for ArrowTypePrefix not supported yet");      \
+  };
+
+  NOT_IMPLEMENTED_VIST(Null)
+  NOT_IMPLEMENTED_VIST(Struct)
+  NOT_IMPLEMENTED_VIST(Union)
+  NOT_IMPLEMENTED_VIST(Decimal)
+  NOT_IMPLEMENTED_VIST(Dictionary)
+
+  Status GenerateLevels(const Array* array, int64_t offset, int64_t length,
+      const std::shared_ptr<Field>& field, int64_t* values_offset,
+      ::arrow::Type::type* values_type, int64_t* num_values, int64_t* num_levels,
+      std::shared_ptr<Buffer>* def_levels, std::shared_ptr<Buffer>* rep_levels,
+      const Array** values_array) {
+    // Work downwards to extract bitmaps and offsets
+    min_offset_idx_ = offset;
+    max_offset_idx_ = offset + length;
+    RETURN_NOT_OK(array->Accept(this));
+    *num_values = max_offset_idx_ - min_offset_idx_;
+    *values_offset = min_offset_idx_;
+    *values_type = values_type_;
+    *values_array = values_array_;
+
+    // Walk downwards to extract nullability
+    std::shared_ptr<Field> current_field = field;
+    nullable_.push_back(current_field->nullable);
+    while (current_field->type->num_children() > 0) {
+      if (current_field->type->num_children() > 1) {
+        return Status::NotImplemented(
+            "Fields with more than one child are not supported.");
+      } else {
+        current_field = current_field->type->child(0);
+      }
+      nullable_.push_back(current_field->nullable);
+    }
+
+    // Generate the levels.
+    if (nullable_.size() == 1) {
+      // We have a PrimitiveArray
+      *rep_levels = nullptr;
+      if (nullable_[0]) {
+        RETURN_NOT_OK(def_levels_buffer_->Resize(length * sizeof(int16_t)));
+        auto def_levels_ptr =
+            reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
+        if (array->null_count() == 0) {
+          std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+        } else {
+          const uint8_t* valid_bits = array->null_bitmap_data();
+          INIT_BITSET(valid_bits, offset);
+          for (int i = 0; i < length; i++) {
+            if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+              def_levels_ptr[i] = 1;
+            } else {
+              def_levels_ptr[i] = 0;
+            }
+            READ_NEXT_BITSET(valid_bits);
+          }
+        }
+        *def_levels = def_levels_buffer_;
+      } else {
+        *def_levels = nullptr;
+      }
+      *num_levels = length;
+    } else {
+      RETURN_NOT_OK(rep_levels_.Append(0));
+      HandleListEntries(0, 0, offset, length);
+
+      std::shared_ptr<Array> def_levels_array;
+      RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
+      *def_levels = static_cast<PrimitiveArray*>(def_levels_array.get())->data();
+
+      std::shared_ptr<Array> rep_levels_array;
+      RETURN_NOT_OK(rep_levels_.Finish(&rep_levels_array));
+      *rep_levels = static_cast<PrimitiveArray*>(rep_levels_array.get())->data();
+      *num_levels = rep_levels_array->length();
+    }
+
+    return Status::OK();
+  }
+
+  Status HandleList(int16_t def_level, int16_t rep_level, int64_t index) {
+    if (nullable_[rep_level]) {
+      if (null_counts_[rep_level] == 0 ||
+          BitUtil::GetBit(valid_bitmaps_[rep_level], index)) {
+        return HandleNonNullList(def_level + 1, rep_level, index);
+      } else {
+        return def_levels_.Append(def_level);
+      }
+    } else {
+      return HandleNonNullList(def_level, rep_level, index);
+    }
+  }
+
+  Status HandleNonNullList(int16_t def_level, int16_t rep_level, int64_t index) {
+    int32_t inner_offset = offsets_[rep_level][index];
+    int32_t inner_length = offsets_[rep_level][index + 1] - inner_offset;
+    int64_t recursion_level = rep_level + 1;
+    if (inner_length == 0) { return def_levels_.Append(def_level); }
+    if (recursion_level < static_cast<int64_t>(offsets_.size())) {
+      return HandleListEntries(def_level + 1, rep_level + 1, inner_offset, inner_length);
+    } else {
+      // We have reached the leaf: primitive list, handle remaining nullables
+      for (int64_t i = 0; i < inner_length; i++) {
+        if (i > 0) { RETURN_NOT_OK(rep_levels_.Append(rep_level + 1)); }
+        if (nullable_[recursion_level] &&
+            ((null_counts_[recursion_level] == 0) ||
+                BitUtil::GetBit(valid_bitmaps_[recursion_level], inner_offset + i))) {
+          RETURN_NOT_OK(def_levels_.Append(def_level + 2));
+        } else {
+          // This can be produced in two case:
+          //  * elements are nullable and this one is null (i.e. max_def_level = def_level
+          //  + 2)
+          //  * elements are non-nullable (i.e. max_def_level = def_level + 1)
+          RETURN_NOT_OK(def_levels_.Append(def_level + 1));
+        }
+      }
+      return Status::OK();
+    }
+  }
+
+  Status HandleListEntries(
+      int16_t def_level, int16_t rep_level, int64_t offset, int64_t length) {
+    for (int64_t i = 0; i < length; i++) {
+      if (i > 0) { RETURN_NOT_OK(rep_levels_.Append(rep_level)); }
+      RETURN_NOT_OK(HandleList(def_level, rep_level, offset + i));
+    }
+    return Status::OK();
+  }
+
+ private:
+  Int16Builder def_levels_;
+  std::shared_ptr<PoolBuffer> def_levels_buffer_;
+  Int16Builder rep_levels_;
+
+  std::vector<int64_t> null_counts_;
+  std::vector<const uint8_t*> valid_bitmaps_;
+  std::vector<const int32_t*> offsets_;
+  std::vector<bool> nullable_;
+
+  int32_t min_offset_idx_;
+  int32_t max_offset_idx_;
+  ::arrow::Type::type values_type_;
+  const Array* values_array_;
+};
+
 class FileWriter::Impl {
  public:
   Impl(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
 
   Status NewRowGroup(int64_t chunk_size);
   template <typename ParquetType, typename ArrowType>
-  Status TypedWriteBatch(
-      ColumnWriter* writer, const PrimitiveArray* data, int64_t offset, int64_t length);
+  Status TypedWriteBatch(ColumnWriter* writer, const Array* data, int64_t offset,
+      int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+      const int16_t* rep_levels);
 
   template <typename ParquetType, typename ArrowType>
-  Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t length,
-      const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
-      int64_t valid_bits_offset, const typename ArrowType::c_type* data_ptr);
+  Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
+      int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels,
+      const uint8_t* valid_bits, int64_t valid_bits_offset,
+      const typename ArrowType::c_type* data_ptr);
 
   // TODO(uwe): Same code as in reader.cc the only difference is the name of the temporary
   // buffer
@@ -85,8 +288,7 @@ class FileWriter::Impl {
     return Status::OK();
   }
 
-  Status WriteFlatColumnChunk(const PrimitiveArray* data, int64_t offset, int64_t length);
-  Status WriteFlatColumnChunk(const BinaryArray* data, int64_t offset, int64_t length);
+  Status WriteColumnChunk(const Array* data, int64_t offset, int64_t length);
   Status Close();
 
   virtual ~Impl() {}
@@ -98,7 +300,6 @@ class FileWriter::Impl {
   // Buffer used for storing the data of an array converted to the physical type
   // as expected by parquet-cpp.
   PoolBuffer data_buffer_;
-  PoolBuffer def_levels_buffer_;
   std::unique_ptr<ParquetFileWriter> writer_;
   RowGroupWriter* row_group_writer_;
 };
@@ -116,47 +317,28 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
 }
 
 template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
-    const PrimitiveArray* data, int64_t offset, int64_t length) {
+Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, const Array* array,
+    int64_t offset, int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels) {
   using ArrowCType = typename ArrowType::c_type;
   using ParquetCType = typename ParquetType::c_type;
 
-  DCHECK((offset + length) <= data->length());
+  DCHECK((offset + num_values) <= array->length());
+  auto data = static_cast<const PrimitiveArray*>(array);
   auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) + offset;
   auto writer = reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer);
-  if (writer->descr()->max_definition_level() == 0) {
+
+  if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
     // no nulls, just dump the data
     const ParquetCType* data_writer_ptr = nullptr;
     RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
-        data_ptr, length, &data_writer_ptr)));
-    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, data_writer_ptr));
-  } else if (writer->descr()->max_definition_level() == 1) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
-    int16_t* def_levels_ptr =
-        reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-    if (data->null_count() == 0) {
-      std::fill(def_levels_ptr, def_levels_ptr + length, 1);
-      const ParquetCType* data_writer_ptr = nullptr;
-      RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
-          data_ptr, length, &data_writer_ptr)));
-      PARQUET_CATCH_NOT_OK(
-          writer->WriteBatch(length, def_levels_ptr, nullptr, data_writer_ptr));
-    } else {
-      const uint8_t* valid_bits = data->null_bitmap_data();
-      INIT_BITSET(valid_bits, offset);
-      for (int i = 0; i < length; i++) {
-        if (bitset & (1 << bit_offset)) {
-          def_levels_ptr[i] = 1;
-        } else {
-          def_levels_ptr[i] = 0;
-        }
-        READ_NEXT_BITSET(valid_bits);
-      }
-      RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(
-          writer, length, def_levels_ptr, nullptr, valid_bits, offset, data_ptr)));
-    }
+        data_ptr, num_values, &data_writer_ptr)));
+    PARQUET_CATCH_NOT_OK(
+        writer->WriteBatch(num_levels, def_levels, rep_levels, data_writer_ptr));
   } else {
-    return Status::NotImplemented("no support for max definition level > 1 yet");
+    const uint8_t* valid_bits = data->null_bitmap_data();
+    RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, num_values,
+        num_levels, def_levels, rep_levels, valid_bits, offset, data_ptr)));
   }
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
@@ -164,22 +346,22 @@ Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
 
 template <typename ParquetType, typename ArrowType>
 Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writer,
-    int64_t length, const int16_t* def_levels, const int16_t* rep_levels,
-    const uint8_t* valid_bits, int64_t valid_bits_offset,
+    int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+    const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
     const typename ArrowType::c_type* data_ptr) {
   using ParquetCType = typename ParquetType::c_type;
 
-  RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ParquetCType)));
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ParquetCType)));
   auto buffer_ptr = reinterpret_cast<ParquetCType*>(data_buffer_.mutable_data());
   INIT_BITSET(valid_bits, valid_bits_offset);
-  for (int i = 0; i < length; i++) {
-    if (bitset & (1 << bit_offset)) {
+  for (int i = 0; i < num_values; i++) {
+    if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
       buffer_ptr[i] = static_cast<ParquetCType>(data_ptr[i]);
     }
     READ_NEXT_BITSET(valid_bits);
   }
   PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(
-      length, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
+      num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, buffer_ptr));
 
   return Status::OK();
 }
@@ -187,16 +369,17 @@ Status FileWriter::Impl::WriteNullableBatch(TypedColumnWriter<ParquetType>* writ
 #define NULLABLE_BATCH_FAST_PATH(ParquetType, ArrowType, CType)                        \
   template <>                                                                          \
   Status FileWriter::Impl::WriteNullableBatch<ParquetType, ArrowType>(                 \
-      TypedColumnWriter<ParquetType> * writer, int64_t length,                         \
+      TypedColumnWriter<ParquetType> * writer, int64_t num_values, int64_t num_levels, \
       const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits, \
       int64_t valid_bits_offset, const CType* data_ptr) {                              \
     PARQUET_CATCH_NOT_OK(writer->WriteBatchSpaced(                                     \
-        length, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr));     \
+        num_levels, def_levels, rep_levels, valid_bits, valid_bits_offset, data_ptr)); \
     return Status::OK();                                                               \
   }
 
 NULLABLE_BATCH_FAST_PATH(Int32Type, ::arrow::Int32Type, int32_t)
 NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::Int64Type, int64_t)
+NULLABLE_BATCH_FAST_PATH(Int64Type, ::arrow::TimestampType, int64_t)
 NULLABLE_BATCH_FAST_PATH(FloatType, ::arrow::FloatType, float)
 NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
 
@@ -206,99 +389,34 @@ NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
 //   ArrowType::c_type to ParquetType::c_type
 template <>
 Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
-    ColumnWriter* column_writer, const PrimitiveArray* data, int64_t offset,
-    int64_t length) {
-  DCHECK((offset + length) <= data->length());
-  RETURN_NOT_OK(data_buffer_.Resize(length));
+    ColumnWriter* column_writer, const Array* array, int64_t offset, int64_t num_values,
+    int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) {
+  DCHECK((offset + num_values) <= array->length());
+  RETURN_NOT_OK(data_buffer_.Resize(num_values));
+  auto data = static_cast<const BooleanArray*>(array);
   auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data());
   auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
   auto writer = reinterpret_cast<TypedColumnWriter<BooleanType>*>(column_writer);
-  if (writer->descr()->max_definition_level() == 0) {
-    // no nulls, just dump the data
-    for (int64_t i = 0; i < length; i++) {
-      buffer_ptr[i] = BitUtil::GetBit(data_ptr, offset + i);
-    }
-    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, nullptr, nullptr, buffer_ptr));
-  } else if (writer->descr()->max_definition_level() == 1) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
-    int16_t* def_levels_ptr =
-        reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-    if (data->null_count() == 0) {
-      std::fill(def_levels_ptr, def_levels_ptr + length, 1);
-      for (int64_t i = 0; i < length; i++) {
-        buffer_ptr[i] = BitUtil::GetBit(data_ptr, offset + i);
-      }
-      // TODO(PARQUET-644): write boolean values as a packed bitmap
-      PARQUET_CATCH_NOT_OK(
-          writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
-    } else {
-      int buffer_idx = 0;
-      for (int i = 0; i < length; i++) {
-        if (data->IsNull(offset + i)) {
-          def_levels_ptr[i] = 0;
-        } else {
-          def_levels_ptr[i] = 1;
-          buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i);
-        }
-      }
-      PARQUET_CATCH_NOT_OK(
-          writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
+
+  int buffer_idx = 0;
+  for (int i = 0; i < num_values; i++) {
+    if (!data->IsNull(offset + i)) {
+      buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i);
     }
-  } else {
-    return Status::NotImplemented("no support for max definition level > 1 yet");
   }
+  PARQUET_CATCH_NOT_OK(
+      writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
 }
 
-Status FileWriter::Impl::Close() {
-  if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
-  PARQUET_CATCH_NOT_OK(writer_->Close());
-  return Status::OK();
-}
-
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)                            \
-  case ::arrow::Type::ENUM:                                                       \
-    return TypedWriteBatch<ParquetType, ArrowType>(writer, data, offset, length); \
-    break;
-
-Status FileWriter::Impl::WriteFlatColumnChunk(
-    const PrimitiveArray* data, int64_t offset, int64_t length) {
-  ColumnWriter* writer;
-  PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
-  switch (data->type_enum()) {
-    TYPED_BATCH_CASE(BOOL, ::arrow::BooleanType, BooleanType)
-    TYPED_BATCH_CASE(UINT8, ::arrow::UInt8Type, Int32Type)
-    TYPED_BATCH_CASE(INT8, ::arrow::Int8Type, Int32Type)
-    TYPED_BATCH_CASE(UINT16, ::arrow::UInt16Type, Int32Type)
-    TYPED_BATCH_CASE(INT16, ::arrow::Int16Type, Int32Type)
-    case ::arrow::Type::UINT32:
-      if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
-        // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
-        // to use the larger Int64Type to store them lossless.
-        return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(
-            writer, data, offset, length);
-      } else {
-        return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(
-            writer, data, offset, length);
-      }
-      TYPED_BATCH_CASE(INT32, ::arrow::Int32Type, Int32Type)
-      TYPED_BATCH_CASE(UINT64, ::arrow::UInt64Type, Int64Type)
-      TYPED_BATCH_CASE(INT64, ::arrow::Int64Type, Int64Type)
-      TYPED_BATCH_CASE(TIMESTAMP, ::arrow::TimestampType, Int64Type)
-      TYPED_BATCH_CASE(FLOAT, ::arrow::FloatType, FloatType)
-      TYPED_BATCH_CASE(DOUBLE, ::arrow::DoubleType, DoubleType)
-    default:
-      return Status::NotImplemented(data->type()->ToString());
-  }
-}
-
-Status FileWriter::Impl::WriteFlatColumnChunk(
-    const BinaryArray* data, int64_t offset, int64_t length) {
-  ColumnWriter* column_writer;
-  PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
-  DCHECK((offset + length) <= data->length());
-  RETURN_NOT_OK(data_buffer_.Resize(length * sizeof(ByteArray)));
+template <>
+Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
+    ColumnWriter* column_writer, const Array* array, int64_t offset, int64_t num_values,
+    int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) {
+  DCHECK((offset + num_values) <= array->length());
+  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ByteArray)));
+  auto data = static_cast<const BinaryArray*>(array);
   auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data());
   // In the case of an array consisting of only empty strings or all null,
   // data->data() points already to a nullptr, thus data->data()->data() will
@@ -309,39 +427,36 @@ Status FileWriter::Impl::WriteFlatColumnChunk(
     DCHECK(data_ptr != nullptr);
   }
   auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(column_writer);
-  if (writer->descr()->max_definition_level() > 0) {
-    RETURN_NOT_OK(def_levels_buffer_.Resize(length * sizeof(int16_t)));
-  }
-  int16_t* def_levels_ptr = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
-  if (writer->descr()->max_definition_level() == 0 || data->null_count() == 0) {
+
+  if (writer->descr()->schema_node()->is_required() || (data->null_count() == 0)) {
     // no nulls, just dump the data
-    for (int64_t i = 0; i < length; i++) {
+    for (int64_t i = 0; i < num_values; i++) {
       buffer_ptr[i] =
           ByteArray(data->value_length(i + offset), data_ptr + data->value_offset(i));
     }
-    if (writer->descr()->max_definition_level() > 0) {
-      std::fill(def_levels_ptr, def_levels_ptr + length, 1);
-    }
-    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
-  } else if (writer->descr()->max_definition_level() == 1) {
+    PARQUET_CATCH_NOT_OK(
+        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
+  } else {
     int buffer_idx = 0;
-    for (int64_t i = 0; i < length; i++) {
-      if (data->IsNull(offset + i)) {
-        def_levels_ptr[i] = 0;
-      } else {
-        def_levels_ptr[i] = 1;
+    for (int64_t i = 0; i < num_values; i++) {
+      if (!data->IsNull(offset + i)) {
         buffer_ptr[buffer_idx++] = ByteArray(
             data->value_length(i + offset), data_ptr + data->value_offset(i + offset));
       }
     }
-    PARQUET_CATCH_NOT_OK(writer->WriteBatch(length, def_levels_ptr, nullptr, buffer_ptr));
-  } else {
-    return Status::NotImplemented("no support for max definition level > 1 yet");
+    PARQUET_CATCH_NOT_OK(
+        writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   }
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
 }
 
+Status FileWriter::Impl::Close() {
+  if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close()); }
+  PARQUET_CATCH_NOT_OK(writer_->Close());
+  return Status::OK();
+}
+
 FileWriter::FileWriter(MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer)
     : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
 
@@ -349,22 +464,83 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) {
   return impl_->NewRowGroup(chunk_size);
 }
 
-Status FileWriter::WriteFlatColumnChunk(
+Status FileWriter::Impl::WriteColumnChunk(
+    const Array* data, int64_t offset, int64_t length) {
+  ColumnWriter* column_writer;
+  PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
+  DCHECK((offset + length) <= data->length());
+
+  int current_column_idx = row_group_writer_->current_column();
+  std::shared_ptr<::arrow::Schema> arrow_schema;
+  RETURN_NOT_OK(
+      FromParquetSchema(writer_->schema(), {current_column_idx - 1}, &arrow_schema));
+  LevelBuilder level_builder(pool_);
+  std::shared_ptr<Buffer> def_levels_buffer;
+  std::shared_ptr<Buffer> rep_levels_buffer;
+  int64_t values_offset;
+  ::arrow::Type::type values_type;
+  int64_t num_levels;
+  int64_t num_values;
+  const Array* values_array;
+  RETURN_NOT_OK(level_builder.GenerateLevels(data, offset, length, arrow_schema->field(0),
+      &values_offset, &values_type, &num_values, &num_levels, &def_levels_buffer,
+      &rep_levels_buffer, &values_array));
+  const int16_t* def_levels = nullptr;
+  if (def_levels_buffer) {
+    def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
+  }
+  const int16_t* rep_levels = nullptr;
+  if (rep_levels_buffer) {
+    rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
+  }
+
+#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType)                              \
+  case ::arrow::Type::ArrowEnum:                                                         \
+    return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(column_writer, values_array, \
+        values_offset, num_values, num_levels, def_levels, rep_levels);                  \
+    break;
+
+  switch (values_type) {
+    case ::arrow::Type::UINT32: {
+      if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
+        // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
+        // to use the larger Int64Type to store them lossless.
+        return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(column_writer,
+            values_array, values_offset, num_values, num_levels, def_levels, rep_levels);
+      } else {
+        return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(column_writer,
+            values_array, values_offset, num_values, num_levels, def_levels, rep_levels);
+      }
+    }
+      WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
+      WRITE_BATCH_CASE(INT8, Int8Type, Int32Type)
+      WRITE_BATCH_CASE(UINT8, UInt8Type, Int32Type)
+      WRITE_BATCH_CASE(INT16, Int16Type, Int32Type)
+      WRITE_BATCH_CASE(UINT16, UInt16Type, Int32Type)
+      WRITE_BATCH_CASE(INT32, Int32Type, Int32Type)
+      WRITE_BATCH_CASE(INT64, Int64Type, Int64Type)
+      WRITE_BATCH_CASE(TIMESTAMP, TimestampType, Int64Type)
+      WRITE_BATCH_CASE(UINT64, UInt64Type, Int64Type)
+      WRITE_BATCH_CASE(FLOAT, FloatType, FloatType)
+      WRITE_BATCH_CASE(DOUBLE, DoubleType, DoubleType)
+      WRITE_BATCH_CASE(BINARY, BinaryType, ByteArrayType)
+      WRITE_BATCH_CASE(STRING, BinaryType, ByteArrayType)
+    default:
+      std::stringstream ss;
+      ss << "Data type not supported as list value: " << values_array->type()->ToString();
+      return Status::NotImplemented(ss.str());
+  }
+
+  PARQUET_CATCH_NOT_OK(column_writer->Close());
+
+  return Status::OK();
+}
+
+Status FileWriter::WriteColumnChunk(
     const ::arrow::Array* array, int64_t offset, int64_t length) {
   int64_t real_length = length;
   if (length == -1) { real_length = array->length(); }
-  if (array->type_enum() == ::arrow::Type::STRING ||
-      array->type_enum() == ::arrow::Type::BINARY) {
-    auto binary_array = static_cast<const ::arrow::BinaryArray*>(array);
-    DCHECK(binary_array);
-    return impl_->WriteFlatColumnChunk(binary_array, offset, real_length);
-  } else {
-    auto primitive_array = dynamic_cast<const PrimitiveArray*>(array);
-    if (!primitive_array) {
-      return Status::NotImplemented("Table must consist of PrimitiveArray instances");
-    }
-    return impl_->WriteFlatColumnChunk(primitive_array, offset, real_length);
-  }
+  return impl_->WriteColumnChunk(array, offset, real_length);
 }
 
 Status FileWriter::Close() {
@@ -377,7 +553,7 @@ MemoryPool* FileWriter::memory_pool() const {
 
 FileWriter::~FileWriter() {}
 
-Status WriteFlatTable(const Table* table, MemoryPool* pool,
+Status WriteTable(const Table* table, MemoryPool* pool,
     const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties) {
   std::shared_ptr<SchemaDescriptor> parquet_schema;
@@ -400,8 +576,8 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool,
     int64_t size = std::min(chunk_size, table->num_rows() - offset);
     RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
     for (int i = 0; i < table->num_columns(); i++) {
-      std::shared_ptr<::arrow::Array> array = table->column(i)->data()->chunk(0);
-      RETURN_NOT_OK_ELSE(writer.WriteFlatColumnChunk(array.get(), offset, size),
+      std::shared_ptr<Array> array = table->column(i)->data()->chunk(0);
+      RETURN_NOT_OK_ELSE(writer.WriteColumnChunk(array.get(), offset, size),
           PARQUET_IGNORE_NOT_OK(writer.Close()));
     }
   }
@@ -409,11 +585,11 @@ Status WriteFlatTable(const Table* table, MemoryPool* pool,
   return writer.Close();
 }
 
-Status WriteFlatTable(const Table* table, MemoryPool* pool,
+Status WriteTable(const Table* table, MemoryPool* pool,
     const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties) {
   auto wrapper = std::make_shared<ArrowOutputStream>(sink);
-  return WriteFlatTable(table, pool, wrapper, chunk_size, properties);
+  return WriteTable(table, pool, wrapper, chunk_size, properties);
 }
 
 }  // namespace arrow

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/arrow/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index a82c2f6..4a39c99 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -50,7 +50,7 @@ class PARQUET_EXPORT FileWriter {
   FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
 
   ::arrow::Status NewRowGroup(int64_t chunk_size);
-  ::arrow::Status WriteFlatColumnChunk(
+  ::arrow::Status WriteColumnChunk(
       const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1);
   ::arrow::Status Close();
 
@@ -64,16 +64,16 @@ class PARQUET_EXPORT FileWriter {
 };
 
 /**
- * Write a flat Table to Parquet.
+ * Write a Table to Parquet.
  *
- * The table shall only consist of nullable, non-repeated columns of primitive type.
+ * The table shall only consist of columns of primitive type or of primitive lists.
  */
-::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
+::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table* table,
     ::arrow::MemoryPool* pool, const std::shared_ptr<OutputStream>& sink,
     int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
 
-::arrow::Status PARQUET_EXPORT WriteFlatTable(const ::arrow::Table* table,
+::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table* table,
     ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>& sink,
     int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties = default_writer_properties());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index d410b5f..6bf6651 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -46,14 +46,21 @@ template <typename T>
 static inline bool vector_equal_with_def_levels(const vector<T>& left,
     const vector<int16_t> def_levels, int16_t max_def_levels, const vector<T>& right) {
   size_t i_left = 0;
-  for (size_t i = 0; i < right.size(); ++i) {
-    if (def_levels[i] != max_def_levels) { continue; }
-    if (left[i_left] != right[i]) {
-      std::cerr << "index " << i << " left was " << left[i_left] << " right was "
-                << right[i] << std::endl;
-      return false;
+  size_t i_right = 0;
+  for (size_t i = 0; i < def_levels.size(); i++) {
+    if (def_levels[i] == max_def_levels) {
+      // Compare
+      if (left[i_left] != right[i_right]) {
+        std::cerr << "index " << i << " left was " << left[i_left] << " right was "
+                  << right[i] << std::endl;
+        return false;
+      }
+      i_left++;
+      i_right++;
+    } else if (def_levels[i] == (max_def_levels - 1)) {
+      // Null entry on the lowest nested level
+      i_right++;
     }
-    i_left++;
   }
 
   return true;
@@ -107,7 +114,10 @@ class TestPrimitiveReader : public ::testing::Test {
     vector<uint8_t> valid_bits(num_levels_, 255);
     int total_values_read = 0;
     int batch_actual = 0;
-    int null_count = -1;
+    int levels_actual = 0;
+    int64_t null_count = -1;
+    int64_t levels_read = 0;
+    int64_t values_read;
 
     Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
     int32_t batch_size = 8;
@@ -116,14 +126,17 @@ class TestPrimitiveReader : public ::testing::Test {
     // 1) batch_size < page_size (multiple ReadBatch from a single page)
     // 2) batch_size > page_size (BatchRead limits to a single page)
     do {
-      batch = reader->ReadBatchSpaced(batch_size, dresult.data() + batch_actual,
-          rresult.data() + batch_actual, vresult.data() + batch_actual, &null_count,
-          valid_bits.data() + batch_actual, 0);
+      batch = reader->ReadBatchSpaced(batch_size, dresult.data() + levels_actual,
+          rresult.data() + levels_actual, vresult.data() + batch_actual,
+          valid_bits.data() + batch_actual, 0, &levels_read, &values_read, &null_count);
       total_values_read += batch - null_count;
       batch_actual += batch;
+      levels_actual += levels_read;
       batch_size = std::max(batch_size * 2, 4096);
-    } while (batch > 0);
+    } while ((batch > 0) || (levels_read > 0));
 
+    ASSERT_EQ(num_levels_, levels_actual);
+    ASSERT_EQ(num_values_, total_values_read);
     if (max_def_level_ > 0) {
       ASSERT_TRUE(vector_equal(def_levels_, dresult));
       ASSERT_TRUE(
@@ -132,11 +145,9 @@ class TestPrimitiveReader : public ::testing::Test {
       ASSERT_TRUE(vector_equal(values_, vresult));
     }
     if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
-    ASSERT_EQ(num_levels_, batch_actual);
-    ASSERT_EQ(num_values_, total_values_read);
     // catch improper writes at EOS
-    batch_actual = reader->ReadBatchSpaced(
-        5, nullptr, nullptr, nullptr, &null_count, valid_bits.data(), 0);
+    batch_actual = reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr,
+        valid_bits.data(), 0, &levels_read, &values_read, &null_count);
     ASSERT_EQ(0, batch_actual);
     ASSERT_EQ(0, null_count);
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 90c0761..7924e55 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -130,25 +130,42 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
       int64_t* values_read);
 
   /// Read a batch of repetition levels, definition levels, and values from the
-  /// column and leave spaces for null entries in the values buffer.
+  /// column and leave spaces for null entries on the lowest level in the values
+  /// buffer.
   ///
   /// In comparision to ReadBatch the length of repetition and definition levels
-  /// is the same as of the number of values read.
+  /// is the same as of the number of values read for max_definition_level == 1.
+  /// In the case of max_definition_level > 1, the repetition and definition
+  /// levels are larger than the values but the values include the null entries
+  /// with definition_level == (max_definition_level - 1).
   ///
   /// To fully exhaust a row group, you must read batches until the number of
   /// values read reaches the number of stored values according to the metadata.
   ///
-  /// @param valid_bits Memory allocated for a bitmap that indicates if
+  /// @param batch_size the number of levels to read
+  /// @param[out] def_levels The Parquet definition levels, output has
+  ///   the length levels_read.
+  /// @param[out] rep_levels The Parquet repetition levels, output has
+  ///   the length levels_read.
+  /// @param[out] values The values in the lowest nested level including
+  ///   spacing for nulls on the lowest levels; output has the length
+  ///   values_read.
+  /// @param[out] valid_bits Memory allocated for a bitmap that indicates if
   ///   the row is null or on the maximum definition level. For performance
   ///   reasons the underlying buffer should be able to store 1 bit more than
   ///   required. If this requires an additional byte, this byte is only read
   ///   but never written to.
   /// @param valid_bits_offset The offset in bits of the valid_bits where the
-  ///  first relevant bit resides.
-  ///
-  /// @return actual number of levels read
+  ///   first relevant bit resides.
+  /// @param[out] levels_read The number of repetition/definition levels that were read.
+  /// @param[out] values_read The number of values read, this includes all
+  ///   non-null entries as well as all null-entries on the lowest level
+  ///   (i.e. definition_level == max_definition_level - 1)
+  /// @param[out] null_count The number of nulls on the lowest levels.
+  ///   (i.e. (values_read - null_count) is total number of non-null entries)
   int64_t ReadBatchSpaced(int batch_size, int16_t* def_levels, int16_t* rep_levels,
-      T* values, int* null_count, uint8_t* valid_bits, int64_t valid_bits_offset);
+      T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
+      int64_t* values_read, int64_t* null_count);
 
   // Skip reading levels
   // Returns the number of levels skipped
@@ -244,8 +261,8 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size, int16_t* def_
 }
 
 inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
-    int16_t max_definition_level, int* null_count, uint8_t* valid_bits,
-    int64_t valid_bits_offset) {
+    int16_t max_definition_level, int64_t* values_read, int64_t* null_count,
+    uint8_t* valid_bits, int64_t valid_bits_offset) {
   int byte_offset = valid_bits_offset / 8;
   int bit_offset = valid_bits_offset % 8;
   uint8_t bitset = valid_bits[byte_offset];
@@ -253,9 +270,11 @@ inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_
   for (int i = 0; i < num_def_levels; ++i) {
     if (def_levels[i] == max_definition_level) {
       bitset |= (1 << bit_offset);
-    } else {
+    } else if (def_levels[i] == (max_definition_level - 1)) {
       bitset &= ~(1 << bit_offset);
       *null_count += 1;
+    } else {
+      continue;
     }
 
     bit_offset++;
@@ -268,14 +287,18 @@ inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_
     }
   }
   if (bit_offset != 0) { valid_bits[byte_offset] = bitset; }
+  *values_read = (bit_offset + byte_offset * 8 - valid_bits_offset);
 }
 
 template <typename DType>
 inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int batch_size,
-    int16_t* def_levels, int16_t* rep_levels, T* values, int* null_count_out,
-    uint8_t* valid_bits, int64_t valid_bits_offset) {
+    int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits,
+    int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read,
+    int64_t* null_count_out) {
   // HasNext invokes ReadNewPage
   if (!HasNext()) {
+    *levels_read = 0;
+    *values_read = 0;
     *null_count_out = 0;
     return 0;
   }
@@ -297,15 +320,28 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int batch_size,
       }
     }
 
-    // TODO: Move this into the DefinitionLevels reader
-    int null_count = 0;
-    int16_t max_definition_level = descr_->max_definition_level();
-    DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
-        &null_count, valid_bits, valid_bits_offset);
+    int64_t null_count = 0;
+    if (descr_->schema_node()->is_required()) {
+      // Node is required so there are no null entries on the lowest nesting level.
+      int values_to_read = 0;
+      for (int64_t i = 0; i < num_def_levels; ++i) {
+        if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
+      }
+      total_values = ReadValues(values_to_read, values);
+      for (int64_t i = 0; i < total_values; i++) {
+        ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
+      }
+      *values_read = total_values;
+    } else {
+      int16_t max_definition_level = descr_->max_definition_level();
+      DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
+          values_read, &null_count, valid_bits, valid_bits_offset);
+      total_values = ReadValuesSpaced(
+          *values_read, values, null_count, valid_bits, valid_bits_offset);
+    }
+    *levels_read = num_def_levels;
     *null_count_out = null_count;
 
-    total_values = ReadValuesSpaced(
-        num_def_levels, values, null_count, valid_bits, valid_bits_offset);
   } else {
     // Required field, read all values
     total_values = ReadValues(batch_size, values);
@@ -313,9 +349,10 @@ inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int batch_size,
       ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
     }
     *null_count_out = 0;
+    *levels_read = total_values;
   }
 
-  num_decoded_values_ += total_values;
+  num_decoded_values_ += *levels_read;
   return total_values;
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/statistics.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc
index df7a308..7d6aebb 100644
--- a/src/parquet/column/statistics.cc
+++ b/src/parquet/column/statistics.cc
@@ -124,13 +124,13 @@ void TypedRowGroupStatistics<DType>::UpdateSpaced(const T* values,
   int64_t length = num_null + num_not_null;
   int64_t i = 0;
   for (; i < length; i++) {
-    if (bitset & (1 << bit_offset)) { break; }
+    if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { break; }
     READ_NEXT_BITSET(valid_bits);
   }
   T min = values[i];
   T max = values[i];
   for (; i < length; i++) {
-    if (bitset & (1 << bit_offset)) {
+    if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
       if (compare(values[i], min)) {
         min = values[i];
       } else if (compare(max, values[i])) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index f06ac30..315c42f 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -361,18 +361,24 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatch(int64_t num_values,
 template <typename DType>
 inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values,
     const int16_t* def_levels, const int16_t* rep_levels, const uint8_t* valid_bits,
-    int64_t valid_bits_offset, const T* values) {
+    int64_t valid_bits_offset, const T* values, int64_t* num_spaced_written) {
   int64_t values_to_write = 0;
+  int64_t spaced_values_to_write = 0;
   // If the field is required and non-repeated, there are no definition levels
   if (descr_->max_definition_level() > 0) {
+    // Minimal definition level for which spaced values are written
+    int16_t min_spaced_def_level = descr_->max_definition_level();
+    if (descr_->schema_node()->is_optional()) { min_spaced_def_level--; }
     for (int64_t i = 0; i < num_values; ++i) {
       if (def_levels[i] == descr_->max_definition_level()) { ++values_to_write; }
+      if (def_levels[i] >= min_spaced_def_level) { ++spaced_values_to_write; }
     }
 
     WriteDefinitionLevels(num_values, def_levels);
   } else {
     // Required field, write all values
     values_to_write = num_values;
+    spaced_values_to_write = num_values;
   }
 
   // Not present for non-repeated fields
@@ -393,7 +399,12 @@ inline int64_t TypedColumnWriter<DType>::WriteMiniBatchSpaced(int64_t num_values
     throw ParquetException("More rows were written in the column chunk than expected");
   }
 
-  WriteValuesSpaced(num_values, valid_bits, valid_bits_offset, values);
+  if (descr_->schema_node()->is_optional()) {
+    WriteValuesSpaced(spaced_values_to_write, valid_bits, valid_bits_offset, values);
+  } else {
+    WriteValues(values_to_write, values);
+  }
+  *num_spaced_written = spaced_values_to_write;
 
   if (page_statistics_ != nullptr) {
     page_statistics_->UpdateSpaced(values, valid_bits, valid_bits_offset, values_to_write,
@@ -447,15 +458,20 @@ void TypedColumnWriter<DType>::WriteBatchSpaced(int64_t num_values,
   int64_t write_batch_size = properties_->write_batch_size();
   int num_batches = num_values / write_batch_size;
   int64_t num_remaining = num_values % write_batch_size;
+  int64_t num_spaced_written = 0;
+  int64_t values_offset = 0;
   for (int round = 0; round < num_batches; round++) {
     int64_t offset = round * write_batch_size;
     WriteMiniBatchSpaced(write_batch_size, &def_levels[offset], &rep_levels[offset],
-        valid_bits, valid_bits_offset + offset, &values[offset]);
+        valid_bits, valid_bits_offset + values_offset, values + values_offset,
+        &num_spaced_written);
+    values_offset += num_spaced_written;
   }
   // Write the remaining values
   int64_t offset = num_batches * write_batch_size;
   WriteMiniBatchSpaced(num_remaining, &def_levels[offset], &rep_levels[offset],
-      valid_bits, valid_bits_offset + offset, &values[offset]);
+      valid_bits, valid_bits_offset + values_offset, values + values_offset,
+      &num_spaced_written);
 }
 
 template <typename DType>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index 094c65b..6ab84b2 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -181,7 +181,7 @@ class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
 
   int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
       const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
-      const T* values);
+      const T* values, int64_t* num_spaced_written);
 
   typedef Encoder<DType> EncoderType;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index d465300..7128500 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -243,7 +243,7 @@ class DictEncoder : public Encoder<DType> {
       int64_t valid_bits_offset) override {
     INIT_BITSET(valid_bits, valid_bits_offset);
     for (int32_t i = 0; i < num_values; i++) {
-      if (bitset & (1 << bit_offset)) { Put(src[i]); }
+      if (bitset_valid_bits & (1 << bit_offset_valid_bits)) { Put(src[i]); }
       READ_NEXT_BITSET(valid_bits);
     }
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index 35a36d3..1c06574 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -52,7 +52,9 @@ class Encoder {
     INIT_BITSET(valid_bits, valid_bits_offset);
     T* data = reinterpret_cast<T*>(buffer.mutable_data());
     for (int32_t i = 0; i < num_values; i++) {
-      if (bitset & (1 << bit_offset)) { data[num_valid_values++] = src[i]; }
+      if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
+        data[num_valid_values++] = src[i];
+      }
       READ_NEXT_BITSET(valid_bits);
     }
     Put(data, num_valid_values);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 0fa4e44..de7c4e4 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -581,6 +581,8 @@ class RowGroupMetaDataBuilder::RowGroupMetaDataBuilderImpl {
     return column_builder_ptr;
   }
 
+  int current_column() { return current_column_; }
+
   void Finish(int64_t total_bytes_written) {
     if (!(current_column_ == schema_->num_columns())) {
       std::stringstream ss;
@@ -635,6 +637,10 @@ ColumnChunkMetaDataBuilder* RowGroupMetaDataBuilder::NextColumnChunk() {
   return impl_->NextColumnChunk();
 }
 
+int RowGroupMetaDataBuilder::current_column() const {
+  return impl_->current_column();
+}
+
 int RowGroupMetaDataBuilder::num_columns() {
   return impl_->num_columns();
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 1f8b09f..eab7fc6 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -196,6 +196,7 @@ class PARQUET_EXPORT RowGroupMetaDataBuilder {
 
   ColumnChunkMetaDataBuilder* NextColumnChunk();
   int num_columns();
+  int current_column() const;
 
   // commit the metadata
   void Finish(int64_t total_bytes_written);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 877f668..8c1316b 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -174,6 +174,10 @@ ColumnWriter* RowGroupSerializer::NextColumn() {
   return current_column_writer_.get();
 }
 
+int RowGroupSerializer::current_column() const {
+  return metadata_->current_column();
+}
+
 void RowGroupSerializer::Close() {
   if (!closed_) {
     closed_ = true;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index f803f92..0140c5b 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -82,6 +82,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
   int64_t num_rows() const override;
 
   ColumnWriter* NextColumn() override;
+  int current_column() const override;
   void Close() override;
 
  private:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
index a381c22..8d7d208 100644
--- a/src/parquet/file/writer.cc
+++ b/src/parquet/file/writer.cc
@@ -41,6 +41,10 @@ ColumnWriter* RowGroupWriter::NextColumn() {
   return contents_->NextColumn();
 }
 
+int RowGroupWriter::current_column() {
+  return contents_->current_column();
+}
+
 // ----------------------------------------------------------------------
 // ParquetFileWriter public API
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index 07ccb51..ab32137 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -42,6 +42,7 @@ class PARQUET_EXPORT RowGroupWriter {
     virtual int64_t num_rows() const = 0;
 
     virtual ColumnWriter* NextColumn() = 0;
+    virtual int current_column() const = 0;
     virtual void Close() = 0;
   };
 
@@ -56,6 +57,8 @@ class PARQUET_EXPORT RowGroupWriter {
    * modified anymore.
    */
   ColumnWriter* NextColumn();
+  /// Index of currently written column
+  int current_column();
   void Close();
 
   int num_columns() const;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/util/bit-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h
index ca094bc..d1e81a3 100644
--- a/src/parquet/util/bit-util.h
+++ b/src/parquet/util/bit-util.h
@@ -32,17 +32,17 @@
 
 namespace parquet {
 
-#define INIT_BITSET(valid_bits_vector, valid_bits_index) \
-  int byte_offset = valid_bits_index / 8;                \
-  int bit_offset = valid_bits_index % 8;                 \
-  uint8_t bitset = valid_bits_vector[byte_offset];
-
-#define READ_NEXT_BITSET(valid_bits_vector)  \
-  bit_offset++;                              \
-  if (bit_offset == 8) {                     \
-    bit_offset = 0;                          \
-    byte_offset++;                           \
-    bitset = valid_bits_vector[byte_offset]; \
+#define INIT_BITSET(valid_bits_vector, valid_bits_index)      \
+  int byte_offset_##valid_bits_vector = valid_bits_index / 8; \
+  int bit_offset_##valid_bits_vector = valid_bits_index % 8;  \
+  uint8_t bitset_##valid_bits_vector = valid_bits_vector[byte_offset_##valid_bits_vector];
+
+#define READ_NEXT_BITSET(valid_bits_vector)                                          \
+  bit_offset_##valid_bits_vector++;                                                  \
+  if (bit_offset_##valid_bits_vector == 8) {                                         \
+    bit_offset_##valid_bits_vector = 0;                                              \
+    byte_offset_##valid_bits_vector++;                                               \
+    bitset_##valid_bits_vector = valid_bits_vector[byte_offset_##valid_bits_vector]; \
   }
 
 // TODO(wesm): The source from Impala was depending on boost::make_unsigned

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ad56e7ae/src/parquet/util/rle-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/rle-encoding.h b/src/parquet/util/rle-encoding.h
index 9b2bf56..aa888ad 100644
--- a/src/parquet/util/rle-encoding.h
+++ b/src/parquet/util/rle-encoding.h
@@ -352,7 +352,7 @@ inline int RleDecoder::GetBatchWithDictSpaced(const Vector<T>& dictionary, T* va
   INIT_BITSET(valid_bits, valid_bits_offset);
 
   while (values_read < batch_size) {
-    bool is_valid = (bitset & (1 << bit_offset));
+    bool is_valid = (bitset_valid_bits & (1 << bit_offset_valid_bits));
     READ_NEXT_BITSET(valid_bits);
 
     if (is_valid) {
@@ -366,7 +366,7 @@ inline int RleDecoder::GetBatchWithDictSpaced(const Vector<T>& dictionary, T* va
         repeat_count_--;
 
         while (repeat_count_ > 0 && (values_read + repeat_batch) < batch_size) {
-          if (bitset & (1 << bit_offset)) {
+          if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
             repeat_count_--;
           } else {
             remaining_nulls--;
@@ -394,7 +394,7 @@ inline int RleDecoder::GetBatchWithDictSpaced(const Vector<T>& dictionary, T* va
 
         // Read the first bitset to the end
         while (literals_read < literal_batch) {
-          if (bitset & (1 << bit_offset)) {
+          if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
             values[values_read + literals_read + skipped] =
                 dictionary[indices[literals_read]];
             literals_read++;


Mime
View raw message