parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-867: Support writing sliced Arrow arrays
Date Thu, 09 Feb 2017 20:25:50 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 7a7234276 -> b2c734b89


PARQUET-867: Support writing sliced Arrow arrays

Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>

Closes #238 from xhochy/PARQUET-867 and squashes the following commits:

e9f79d3 [Korn, Uwe] Remove alt-space from code
5fe849d [Korn, Uwe] Address review comments
fa7a1e0 [Korn, Uwe] Use Slice instead of offset,length
d68e9db [Korn, Uwe] Update Arrow hash
61e3ac0 [Korn, Uwe] Use references instead of pointers
d3c4ec6 [Korn, Uwe] Remove offset,length from public interface
ec0577f [Korn, Uwe] PARQUET-867: Support writing sliced Arrow arrays


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/b2c734b8
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/b2c734b8
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/b2c734b8

Branch: refs/heads/master
Commit: b2c734b89e8e766d84c062345552e99b9b2eec04
Parents: 7a72342
Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>
Authored: Thu Feb 9 15:25:36 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Thu Feb 9 15:25:36 2017 -0500

----------------------------------------------------------------------
 cmake_modules/ThirdpartyToolchain.cmake         |   2 +-
 .../arrow/arrow-reader-writer-benchmark.cc      |   4 +-
 src/parquet/arrow/arrow-reader-writer-test.cc   |  75 ++++++---
 src/parquet/arrow/writer.cc                     | 162 +++++++++----------
 src/parquet/arrow/writer.h                      |   7 +-
 src/parquet/column/column-writer-test.cc        |   3 +-
 src/parquet/column/scanner-test.cc              |   3 +-
 src/parquet/encoding-internal.h                 |   4 +-
 src/parquet/util/bit-util.h                     |   6 +-
 9 files changed, 150 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/cmake_modules/ThirdpartyToolchain.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/ThirdpartyToolchain.cmake b/cmake_modules/ThirdpartyToolchain.cmake
index edccc3a..526e8c6 100644
--- a/cmake_modules/ThirdpartyToolchain.cmake
+++ b/cmake_modules/ThirdpartyToolchain.cmake
@@ -22,7 +22,7 @@ set(THRIFT_VERSION "0.9.1")
 
 # Brotli 0.5.2 does not install headers/libraries yet, but 0.6.0.dev does
 set(BROTLI_VERSION "5db62dcc9d386579609540cdf8869e95ad334bbd")
-set(ARROW_VERSION "5439b71586f4b0f9a36544b9e2417ee6ad7b48e8")
+set(ARROW_VERSION "0bdfd5efb2d7360f8ec8f6a65401d4c76a8df597")
 
 # find boost headers and libs
 set(Boost_DEBUG TRUE)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/arrow/arrow-reader-writer-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index 3f2a688..c4a4777 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -109,7 +109,7 @@ static void BM_WriteColumn(::benchmark::State& state) {
 
   while (state.KeepRunning()) {
     auto output = std::make_shared<InMemoryOutputStream>();
-    WriteTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE);
+    WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE);
   }
   SetBytesProcessed<nullable, ParquetType>(state);
 }
@@ -128,7 +128,7 @@ static void BM_ReadColumn(::benchmark::State& state) {
   std::vector<typename ParquetType::c_type> values(BENCHMARK_SIZE, 128);
   std::shared_ptr<::arrow::Table> table = TableFromVector<nullable, ParquetType>(values);
   auto output = std::make_shared<InMemoryOutputStream>();
-  WriteTable(table.get(), ::arrow::default_memory_pool(), output, BENCHMARK_SIZE);
+  WriteTable(*table, ::arrow::default_memory_pool(), output, BENCHMARK_SIZE);
   std::shared_ptr<Buffer> buffer = output->GetBuffer();
 
   while (state.KeepRunning()) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 63953ca..2dfdbd2 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -258,10 +258,12 @@ class TestParquetIO : public ::testing::Test {
     std::shared_ptr<Array> values;
     ASSERT_OK(NullableArray<TestType>(
         size * size, nullable_elements ? null_count : 0, kDefaultSeed, &values));
+    // Also test that slice offsets are respected
+    values = values->Slice(5, values->length() - 5);
     std::shared_ptr<ListArray> lists;
     ASSERT_OK(MakeListArary(
         values, size, nullable_lists ? null_count : 0, nullable_elements, &lists));
-    *out = MakeSimpleTable(lists, nullable_lists);
+    *out = MakeSimpleTable(lists->Slice(3, size - 6), nullable_lists);
   }
 
   void PrepareListOfListTable(int64_t size, bool nullable_parent_lists,
@@ -282,8 +284,8 @@ class TestParquetIO : public ::testing::Test {
   void WriteReadAndCheckSingleColumnTable(const std::shared_ptr<Table>& table)
{
     std::shared_ptr<Array> values = table->column(0)->data()->chunk(0);
     this->sink_ = std::make_shared<InMemoryOutputStream>();
-    ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(),
-        this->sink_, values->length(), default_writer_properties()));
+    ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
+        values->length(), default_writer_properties()));
 
     this->ReadAndCheckSingleColumnTable(values);
   }
@@ -293,7 +295,7 @@ class TestParquetIO : public ::testing::Test {
       const std::shared_ptr<ArrayType>& values) {
     FileWriter writer(::arrow::default_memory_pool(), MakeWriter(schema));
     ASSERT_OK_NO_THROW(writer.NewRowGroup(values->length()));
-    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get()));
+    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*values));
     ASSERT_OK_NO_THROW(writer.Close());
   }
 
@@ -308,7 +310,8 @@ class TestParquetIO : public ::testing::Test {
 typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
     ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::Int32Type, ::arrow::UInt64Type,
     ::arrow::Int64Type, ::arrow::TimestampType, ::arrow::FloatType, ::arrow::DoubleType,
-    ::arrow::StringType, ::arrow::BinaryType> TestTypes;
+    ::arrow::StringType, ::arrow::BinaryType>
+    TestTypes;
 
 TYPED_TEST_CASE(TestParquetIO, TestTypes);
 
@@ -327,7 +330,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredWrite) {
   ASSERT_OK(NonNullArray<TypeParam>(SMALL_SIZE, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, false);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_,
+  ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
       values->length(), default_writer_properties()));
 
   std::shared_ptr<Table> out;
@@ -354,6 +357,36 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalReadWrite) {
   this->ReadAndCheckSingleColumnFile(values.get());
 }
 
+TYPED_TEST(TestParquetIO, SingleColumnRequiredSliceWrite) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NonNullArray<TypeParam>(2 * SMALL_SIZE, &values));
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::REQUIRED);
+
+  std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
+  this->WriteColumn(schema, sliced_values);
+  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+
+  // Slice offset 1 higher
+  sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
+  this->WriteColumn(schema, sliced_values);
+  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+}
+
+TYPED_TEST(TestParquetIO, SingleColumnOptionalSliceWrite) {
+  std::shared_ptr<Array> values;
+  ASSERT_OK(NullableArray<TypeParam>(2 * SMALL_SIZE, SMALL_SIZE, kDefaultSeed, &values));
+  std::shared_ptr<GroupNode> schema = this->MakeSchema(Repetition::OPTIONAL);
+
+  std::shared_ptr<Array> sliced_values = values->Slice(SMALL_SIZE / 2, SMALL_SIZE);
+  this->WriteColumn(schema, sliced_values);
+  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+
+  // Slice offset 1 higher, thus different null bitmap.
+  sliced_values = values->Slice(SMALL_SIZE / 2 + 1, SMALL_SIZE);
+  this->WriteColumn(schema, sliced_values);
+  this->ReadAndCheckSingleColumnFile(sliced_values.get());
+}
+
 TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
   // This also tests max_definition_level = 1
   std::shared_ptr<Array> values;
@@ -402,7 +435,8 @@ TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
   FileWriter writer(default_memory_pool(), this->MakeWriter(schema));
   for (int i = 0; i < 4; i++) {
     ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
-    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get(), i * chunk_size, chunk_size));
+    std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
+    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
   }
   ASSERT_OK_NO_THROW(writer.Close());
 
@@ -415,7 +449,7 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWrite) {
   std::shared_ptr<Table> table = MakeSimpleTable(values, false);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
   ASSERT_OK_NO_THROW(WriteTable(
-      table.get(), default_memory_pool(), this->sink_, 512, default_writer_properties()));
+      *table, default_memory_pool(), this->sink_, 512, default_writer_properties()));
 
   this->ReadAndCheckSingleColumnTable(values);
 }
@@ -430,8 +464,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableRequiredChunkedWriteArrowIO)
{
   {
     // BufferOutputStream closed on gc
     auto arrow_sink_ = std::make_shared<::arrow::io::BufferOutputStream>(buffer);
-    ASSERT_OK_NO_THROW(WriteTable(table.get(), default_memory_pool(), arrow_sink_, 512,
-        default_writer_properties()));
+    ASSERT_OK_NO_THROW(WriteTable(
+        *table, default_memory_pool(), arrow_sink_, 512, default_writer_properties()));
 
     // XXX: Remove this after ARROW-455 completed
     ASSERT_OK(arrow_sink_->Close());
@@ -462,7 +496,8 @@ TYPED_TEST(TestParquetIO, SingleColumnOptionalChunkedWrite) {
   FileWriter writer(::arrow::default_memory_pool(), this->MakeWriter(schema));
   for (int i = 0; i < 4; i++) {
     ASSERT_OK_NO_THROW(writer.NewRowGroup(chunk_size));
-    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(values.get(), i * chunk_size, chunk_size));
+    std::shared_ptr<Array> sliced_array = values->Slice(i * chunk_size, chunk_size);
+    ASSERT_OK_NO_THROW(writer.WriteColumnChunk(*sliced_array));
   }
   ASSERT_OK_NO_THROW(writer.Close());
 
@@ -476,8 +511,8 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalChunkedWrite) {
   ASSERT_OK(NullableArray<TypeParam>(LARGE_SIZE, 100, kDefaultSeed, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, true);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_,
-      512, default_writer_properties()));
+  ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512,
+      default_writer_properties()));
 
   this->ReadAndCheckSingleColumnTable(values);
 }
@@ -550,7 +585,7 @@ TEST_F(TestUInt32ParquetIO, Parquet_2_0_Compability) {
           .version(ParquetVersion::PARQUET_2_0)
           ->build();
   ASSERT_OK_NO_THROW(
-      WriteTable(table.get(), default_memory_pool(), this->sink_, 512, properties));
+      WriteTable(*table, default_memory_pool(), this->sink_, 512, properties));
   this->ReadAndCheckSingleColumnTable(values);
 }
 
@@ -571,8 +606,8 @@ TEST_F(TestUInt32ParquetIO, Parquet_1_0_Compability) {
       ::parquet::WriterProperties::Builder()
           .version(ParquetVersion::PARQUET_1_0)
           ->build();
-  ASSERT_OK_NO_THROW(WriteTable(
-      table.get(), ::arrow::default_memory_pool(), this->sink_, 512, properties));
+  ASSERT_OK_NO_THROW(
+      WriteTable(*table, ::arrow::default_memory_pool(), this->sink_, 512, properties));
 
   std::shared_ptr<Array> expected_values;
   std::shared_ptr<PoolBuffer> int64_data =
@@ -606,7 +641,7 @@ TEST_F(TestStringParquetIO, EmptyStringColumnRequiredWrite) {
   ASSERT_OK(builder.Finish(&values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, false);
   this->sink_ = std::make_shared<InMemoryOutputStream>();
-  ASSERT_OK_NO_THROW(WriteTable(table.get(), ::arrow::default_memory_pool(), this->sink_,
+  ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
       values->length(), default_writer_properties()));
 
   std::shared_ptr<Table> out;
@@ -692,8 +727,8 @@ class TestPrimitiveParquetIO : public TestParquetIO<TestType> {
 
 typedef ::testing::Types<::arrow::BooleanType, ::arrow::UInt8Type, ::arrow::Int8Type,
     ::arrow::UInt16Type, ::arrow::Int16Type, ::arrow::UInt32Type, ::arrow::Int32Type,
-    ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType,
-    ::arrow::DoubleType> PrimitiveTestTypes;
+    ::arrow::UInt64Type, ::arrow::Int64Type, ::arrow::FloatType, ::arrow::DoubleType>
+    PrimitiveTestTypes;
 
 TYPED_TEST_CASE(TestPrimitiveParquetIO, PrimitiveTestTypes);
 
@@ -738,7 +773,7 @@ void DoTableRoundtrip(const std::shared_ptr<Table>& table, int
num_threads,
   auto sink = std::make_shared<InMemoryOutputStream>();
 
   ASSERT_OK_NO_THROW(WriteTable(
-      table.get(), ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
+      *table, ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
 
   std::shared_ptr<Buffer> buffer = sink->GetBuffer();
   std::unique_ptr<FileReader> reader;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/arrow/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.cc b/src/parquet/arrow/writer.cc
index 0be6b69..90e037f 100644
--- a/src/parquet/arrow/writer.cc
+++ b/src/parquet/arrow/writer.cc
@@ -59,6 +59,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor {
 
 #define PRIMITIVE_VISIT(ArrowTypePrefix)                                \
   Status Visit(const ::arrow::ArrowTypePrefix##Array& array) override { \
+    array_offsets_.push_back(array.offset());                           \
     valid_bitmaps_.push_back(array.null_bitmap_data());                 \
     null_counts_.push_back(array.null_count());                         \
     values_type_ = array.type_enum();                                   \
@@ -86,12 +87,13 @@ class LevelBuilder : public ::arrow::ArrayVisitor {
   PRIMITIVE_VISIT(Interval)
 
   Status Visit(const ListArray& array) override {
+    array_offsets_.push_back(array.offset());
     valid_bitmaps_.push_back(array.null_bitmap_data());
     null_counts_.push_back(array.null_count());
     offsets_.push_back(array.raw_value_offsets());
 
-    min_offset_idx_ = array.raw_value_offsets()[min_offset_idx_];
-    max_offset_idx_ = array.raw_value_offsets()[max_offset_idx_];
+    min_offset_idx_ = array.value_offset(min_offset_idx_);
+    max_offset_idx_ = array.value_offset(max_offset_idx_);
 
     return array.values()->Accept(this);
   }
@@ -108,15 +110,14 @@ class LevelBuilder : public ::arrow::ArrayVisitor {
   NOT_IMPLEMENTED_VIST(Decimal)
   NOT_IMPLEMENTED_VIST(Dictionary)
 
-  Status GenerateLevels(const Array* array, int64_t offset, int64_t length,
-      const std::shared_ptr<Field>& field, int64_t* values_offset,
-      ::arrow::Type::type* values_type, int64_t* num_values, int64_t* num_levels,
-      std::shared_ptr<Buffer>* def_levels, std::shared_ptr<Buffer>* rep_levels,
-      const Array** values_array) {
+  Status GenerateLevels(const Array& array, const std::shared_ptr<Field>& field,
+      int64_t* values_offset, ::arrow::Type::type* values_type, int64_t* num_values,
+      int64_t* num_levels, std::shared_ptr<Buffer>* def_levels,
+      std::shared_ptr<Buffer>* rep_levels, const Array** values_array) {
     // Work downwards to extract bitmaps and offsets
-    min_offset_idx_ = offset;
-    max_offset_idx_ = offset + length;
-    RETURN_NOT_OK(array->Accept(this));
+    min_offset_idx_ = 0;
+    max_offset_idx_ = array.length();
+    RETURN_NOT_OK(array.Accept(this));
     *num_values = max_offset_idx_ - min_offset_idx_;
     *values_offset = min_offset_idx_;
     *values_type = values_type_;
@@ -140,15 +141,15 @@ class LevelBuilder : public ::arrow::ArrayVisitor {
       // We have a PrimitiveArray
       *rep_levels = nullptr;
       if (nullable_[0]) {
-        RETURN_NOT_OK(def_levels_buffer_->Resize(length * sizeof(int16_t)));
+        RETURN_NOT_OK(def_levels_buffer_->Resize(array.length() * sizeof(int16_t)));
         auto def_levels_ptr =
             reinterpret_cast<int16_t*>(def_levels_buffer_->mutable_data());
-        if (array->null_count() == 0) {
-          std::fill(def_levels_ptr, def_levels_ptr + length, 1);
+        if (array.null_count() == 0) {
+          std::fill(def_levels_ptr, def_levels_ptr + array.length(), 1);
         } else {
-          const uint8_t* valid_bits = array->null_bitmap_data();
-          INIT_BITSET(valid_bits, offset);
-          for (int i = 0; i < length; i++) {
+          const uint8_t* valid_bits = array.null_bitmap_data();
+          INIT_BITSET(valid_bits, array.offset());
+          for (int i = 0; i < array.length(); i++) {
             if (bitset_valid_bits & (1 << bit_offset_valid_bits)) {
               def_levels_ptr[i] = 1;
             } else {
@@ -161,10 +162,10 @@ class LevelBuilder : public ::arrow::ArrayVisitor {
       } else {
         *def_levels = nullptr;
       }
-      *num_levels = length;
+      *num_levels = array.length();
     } else {
       RETURN_NOT_OK(rep_levels_.Append(0));
-      HandleListEntries(0, 0, offset, length);
+      HandleListEntries(0, 0, 0, array.length());
 
       std::shared_ptr<Array> def_levels_array;
       RETURN_NOT_OK(def_levels_.Finish(&def_levels_array));
@@ -182,7 +183,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor {
   Status HandleList(int16_t def_level, int16_t rep_level, int64_t index) {
     if (nullable_[rep_level]) {
       if (null_counts_[rep_level] == 0 ||
-          BitUtil::GetBit(valid_bitmaps_[rep_level], index)) {
+          BitUtil::GetBit(valid_bitmaps_[rep_level], index + array_offsets_[rep_level]))
{
         return HandleNonNullList(def_level + 1, rep_level, index);
       } else {
         return def_levels_.Append(def_level);
@@ -205,7 +206,8 @@ class LevelBuilder : public ::arrow::ArrayVisitor {
         if (i > 0) { RETURN_NOT_OK(rep_levels_.Append(rep_level + 1)); }
         if (nullable_[recursion_level] &&
             ((null_counts_[recursion_level] == 0) ||
-                BitUtil::GetBit(valid_bitmaps_[recursion_level], inner_offset + i))) {
+                BitUtil::GetBit(valid_bitmaps_[recursion_level],
+                    inner_offset + i + array_offsets_[recursion_level]))) {
           RETURN_NOT_OK(def_levels_.Append(def_level + 2));
         } else {
           // This can be produced in two case:
@@ -236,6 +238,7 @@ class LevelBuilder : public ::arrow::ArrayVisitor {
   std::vector<int64_t> null_counts_;
   std::vector<const uint8_t*> valid_bitmaps_;
   std::vector<const int32_t*> offsets_;
+  std::vector<int32_t> array_offsets_;
   std::vector<bool> nullable_;
 
   int32_t min_offset_idx_;
@@ -250,9 +253,8 @@ class FileWriter::Impl {
 
   Status NewRowGroup(int64_t chunk_size);
   template <typename ParquetType, typename ArrowType>
-  Status TypedWriteBatch(ColumnWriter* writer, const Array* data, int64_t offset,
-      int64_t num_values, int64_t num_levels, const int16_t* def_levels,
-      const int16_t* rep_levels);
+  Status TypedWriteBatch(ColumnWriter* writer, const std::shared_ptr<Array>& data,
+      int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels);
 
   template <typename ParquetType, typename ArrowType>
   Status WriteNullableBatch(TypedColumnWriter<ParquetType>* writer, int64_t num_values,
@@ -288,7 +290,7 @@ class FileWriter::Impl {
     return Status::OK();
   }
 
-  Status WriteColumnChunk(const Array* data, int64_t offset, int64_t length);
+  Status WriteColumnChunk(const Array& data);
   Status Close();
 
   virtual ~Impl() {}
@@ -317,28 +319,28 @@ Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
 }
 
 template <typename ParquetType, typename ArrowType>
-Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer, const Array* array,
-    int64_t offset, int64_t num_values, int64_t num_levels, const int16_t* def_levels,
+Status FileWriter::Impl::TypedWriteBatch(ColumnWriter* column_writer,
+    const std::shared_ptr<Array>& array, int64_t num_levels, const int16_t* def_levels,
     const int16_t* rep_levels) {
   using ArrowCType = typename ArrowType::c_type;
   using ParquetCType = typename ParquetType::c_type;
 
-  DCHECK((offset + num_values) <= array->length());
-  auto data = static_cast<const PrimitiveArray*>(array);
-  auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data()) +
offset;
+  auto data = static_cast<const PrimitiveArray*>(array.get());
+  auto data_ptr = reinterpret_cast<const ArrowCType*>(data->data()->data());
   auto writer = reinterpret_cast<TypedColumnWriter<ParquetType>*>(column_writer);
 
   if (writer->descr()->schema_node()->is_required() || (data->null_count() ==
0)) {
     // no nulls, just dump the data
     const ParquetCType* data_writer_ptr = nullptr;
     RETURN_NOT_OK((ConvertPhysicalType<ArrowCType, ParquetCType>(
-        data_ptr, num_values, &data_writer_ptr)));
+        data_ptr + data->offset(), array->length(), &data_writer_ptr)));
     PARQUET_CATCH_NOT_OK(
         writer->WriteBatch(num_levels, def_levels, rep_levels, data_writer_ptr));
   } else {
     const uint8_t* valid_bits = data->null_bitmap_data();
-    RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, num_values,
-        num_levels, def_levels, rep_levels, valid_bits, offset, data_ptr)));
+    RETURN_NOT_OK((WriteNullableBatch<ParquetType, ArrowType>(writer, data->length(),
+        num_levels, def_levels, rep_levels, valid_bits, data->offset(),
+        data_ptr + data->offset())));
   }
   PARQUET_CATCH_NOT_OK(writer->Close());
   return Status::OK();
@@ -389,18 +391,18 @@ NULLABLE_BATCH_FAST_PATH(DoubleType, ::arrow::DoubleType, double)
 //   ArrowType::c_type to ParquetType::c_type
 template <>
 Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
-    ColumnWriter* column_writer, const Array* array, int64_t offset, int64_t num_values,
-    int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) {
-  DCHECK((offset + num_values) <= array->length());
-  RETURN_NOT_OK(data_buffer_.Resize(num_values));
-  auto data = static_cast<const BooleanArray*>(array);
+    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels) {
+  RETURN_NOT_OK(data_buffer_.Resize(array->length()));
+  auto data = static_cast<const BooleanArray*>(array.get());
   auto data_ptr = reinterpret_cast<const uint8_t*>(data->data()->data());
   auto buffer_ptr = reinterpret_cast<bool*>(data_buffer_.mutable_data());
   auto writer = reinterpret_cast<TypedColumnWriter<BooleanType>*>(column_writer);
 
   int buffer_idx = 0;
-  for (int i = 0; i < num_values; i++) {
-    if (!data->IsNull(offset + i)) {
+  int32_t offset = array->offset();
+  for (int i = 0; i < data->length(); i++) {
+    if (!data->IsNull(i)) {
       buffer_ptr[buffer_idx++] = BitUtil::GetBit(data_ptr, offset + i);
     }
   }
@@ -412,11 +414,10 @@ Status FileWriter::Impl::TypedWriteBatch<BooleanType, ::arrow::BooleanType>(
 
 template <>
 Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
-    ColumnWriter* column_writer, const Array* array, int64_t offset, int64_t num_values,
-    int64_t num_levels, const int16_t* def_levels, const int16_t* rep_levels) {
-  DCHECK((offset + num_values) <= array->length());
-  RETURN_NOT_OK(data_buffer_.Resize(num_values * sizeof(ByteArray)));
-  auto data = static_cast<const BinaryArray*>(array);
+    ColumnWriter* column_writer, const std::shared_ptr<Array>& array, int64_t num_levels,
+    const int16_t* def_levels, const int16_t* rep_levels) {
+  RETURN_NOT_OK(data_buffer_.Resize(array->length() * sizeof(ByteArray)));
+  auto data = static_cast<const BinaryArray*>(array.get());
   auto buffer_ptr = reinterpret_cast<ByteArray*>(data_buffer_.mutable_data());
   // In the case of an array consisting of only empty strings or all null,
   // data->data() points already to a nullptr, thus data->data()->data() will
@@ -427,21 +428,22 @@ Status FileWriter::Impl::TypedWriteBatch<ByteArrayType, ::arrow::BinaryType>(
     DCHECK(data_ptr != nullptr);
   }
   auto writer = reinterpret_cast<TypedColumnWriter<ByteArrayType>*>(column_writer);
+  const int32_t* value_offset = data->raw_value_offsets();
 
   if (writer->descr()->schema_node()->is_required() || (data->null_count() ==
0)) {
     // no nulls, just dump the data
-    for (int64_t i = 0; i < num_values; i++) {
+    for (int64_t i = 0; i < data->length(); i++) {
       buffer_ptr[i] =
-          ByteArray(data->value_length(i + offset), data_ptr + data->value_offset(i));
+          ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
     }
     PARQUET_CATCH_NOT_OK(
         writer->WriteBatch(num_levels, def_levels, rep_levels, buffer_ptr));
   } else {
     int buffer_idx = 0;
-    for (int64_t i = 0; i < num_values; i++) {
-      if (!data->IsNull(offset + i)) {
-        buffer_ptr[buffer_idx++] = ByteArray(
-            data->value_length(i + offset), data_ptr + data->value_offset(i + offset));
+    for (int64_t i = 0; i < data->length(); i++) {
+      if (!data->IsNull(i)) {
+        buffer_ptr[buffer_idx++] =
+            ByteArray(value_offset[i + 1] - value_offset[i], data_ptr + value_offset[i]);
       }
     }
     PARQUET_CATCH_NOT_OK(
@@ -464,11 +466,9 @@ Status FileWriter::NewRowGroup(int64_t chunk_size) {
   return impl_->NewRowGroup(chunk_size);
 }
 
-Status FileWriter::Impl::WriteColumnChunk(
-    const Array* data, int64_t offset, int64_t length) {
+Status FileWriter::Impl::WriteColumnChunk(const Array& data) {
   ColumnWriter* column_writer;
   PARQUET_CATCH_NOT_OK(column_writer = row_group_writer_->NextColumn());
-  DCHECK((offset + length) <= data->length());
 
   int current_column_idx = row_group_writer_->current_column();
   std::shared_ptr<::arrow::Schema> arrow_schema;
@@ -481,10 +481,10 @@ Status FileWriter::Impl::WriteColumnChunk(
   ::arrow::Type::type values_type;
   int64_t num_levels;
   int64_t num_values;
-  const Array* values_array;
-  RETURN_NOT_OK(level_builder.GenerateLevels(data, offset, length, arrow_schema->field(0),
-      &values_offset, &values_type, &num_values, &num_levels, &def_levels_buffer,
-      &rep_levels_buffer, &values_array));
+  const Array* _values_array;
+  RETURN_NOT_OK(level_builder.GenerateLevels(data, arrow_schema->field(0), &values_offset,
+      &values_type, &num_values, &num_levels, &def_levels_buffer, &rep_levels_buffer,
+      &_values_array));
   const int16_t* def_levels = nullptr;
   if (def_levels_buffer) {
     def_levels = reinterpret_cast<const int16_t*>(def_levels_buffer->data());
@@ -493,11 +493,12 @@ Status FileWriter::Impl::WriteColumnChunk(
   if (rep_levels_buffer) {
     rep_levels = reinterpret_cast<const int16_t*>(rep_levels_buffer->data());
   }
+  std::shared_ptr<Array> values_array = _values_array->Slice(values_offset, num_values);
 
-#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType)                             
\
-  case ::arrow::Type::ArrowEnum:                                                        
\
-    return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(column_writer, values_array,
\
-        values_offset, num_values, num_levels, def_levels, rep_levels);                 
\
+#define WRITE_BATCH_CASE(ArrowEnum, ArrowType, ParquetType)               \
+  case ::arrow::Type::ArrowEnum:                                          \
+    return TypedWriteBatch<ParquetType, ::arrow::ArrowType>(              \
+        column_writer, values_array, num_levels, def_levels, rep_levels); \
     break;
 
   switch (values_type) {
@@ -505,11 +506,11 @@ Status FileWriter::Impl::WriteColumnChunk(
       if (writer_->properties()->version() == ParquetVersion::PARQUET_1_0) {
         // Parquet 1.0 reader cannot read the UINT_32 logical type. Thus we need
         // to use the larger Int64Type to store them lossless.
-        return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(column_writer,
-            values_array, values_offset, num_values, num_levels, def_levels, rep_levels);
+        return TypedWriteBatch<Int64Type, ::arrow::UInt32Type>(
+            column_writer, values_array, num_levels, def_levels, rep_levels);
       } else {
-        return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(column_writer,
-            values_array, values_offset, num_values, num_levels, def_levels, rep_levels);
+        return TypedWriteBatch<Int32Type, ::arrow::UInt32Type>(
+            column_writer, values_array, num_levels, def_levels, rep_levels);
       }
     }
       WRITE_BATCH_CASE(BOOL, BooleanType, BooleanType)
@@ -536,11 +537,8 @@ Status FileWriter::Impl::WriteColumnChunk(
   return Status::OK();
 }
 
-Status FileWriter::WriteColumnChunk(
-    const ::arrow::Array* array, int64_t offset, int64_t length) {
-  int64_t real_length = length;
-  if (length == -1) { real_length = array->length(); }
-  return impl_->WriteColumnChunk(array, offset, real_length);
+Status FileWriter::WriteColumnChunk(const ::arrow::Array& array) {
+  return impl_->WriteColumnChunk(array);
 }
 
 Status FileWriter::Close() {
@@ -553,39 +551,39 @@ MemoryPool* FileWriter::memory_pool() const {
 
 FileWriter::~FileWriter() {}
 
-Status WriteTable(const Table* table, MemoryPool* pool,
+Status WriteTable(const Table& table, MemoryPool* pool,
     const std::shared_ptr<OutputStream>& sink, int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties) {
   std::shared_ptr<SchemaDescriptor> parquet_schema;
-  RETURN_NOT_OK(
-      ToParquetSchema(table->schema().get(), *properties.get(), &parquet_schema));
+  RETURN_NOT_OK(ToParquetSchema(table.schema().get(), *properties, &parquet_schema));
   auto schema_node = std::static_pointer_cast<GroupNode>(parquet_schema->schema_root());
   std::unique_ptr<ParquetFileWriter> parquet_writer =
       ParquetFileWriter::Open(sink, schema_node, properties);
   FileWriter writer(pool, std::move(parquet_writer));
 
   // TODO(ARROW-232) Support writing chunked arrays.
-  for (int i = 0; i < table->num_columns(); i++) {
-    if (table->column(i)->data()->num_chunks() != 1) {
+  for (int i = 0; i < table.num_columns(); i++) {
+    if (table.column(i)->data()->num_chunks() != 1) {
       return Status::NotImplemented("No support for writing chunked arrays yet.");
     }
   }
 
-  for (int chunk = 0; chunk * chunk_size < table->num_rows(); chunk++) {
+  for (int chunk = 0; chunk * chunk_size < table.num_rows(); chunk++) {
     int64_t offset = chunk * chunk_size;
-    int64_t size = std::min(chunk_size, table->num_rows() - offset);
+    int64_t size = std::min(chunk_size, table.num_rows() - offset);
     RETURN_NOT_OK_ELSE(writer.NewRowGroup(size), PARQUET_IGNORE_NOT_OK(writer.Close()));
-    for (int i = 0; i < table->num_columns(); i++) {
-      std::shared_ptr<Array> array = table->column(i)->data()->chunk(0);
-      RETURN_NOT_OK_ELSE(writer.WriteColumnChunk(array.get(), offset, size),
-          PARQUET_IGNORE_NOT_OK(writer.Close()));
+    for (int i = 0; i < table.num_columns(); i++) {
+      std::shared_ptr<Array> array = table.column(i)->data()->chunk(0);
+      array = array->Slice(offset, size);
+      RETURN_NOT_OK_ELSE(
+          writer.WriteColumnChunk(*array), PARQUET_IGNORE_NOT_OK(writer.Close()));
     }
   }
 
   return writer.Close();
 }
 
-Status WriteTable(const Table* table, MemoryPool* pool,
+Status WriteTable(const Table& table, MemoryPool* pool,
     const std::shared_ptr<::arrow::io::OutputStream>& sink, int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties) {
   auto wrapper = std::make_shared<ArrowOutputStream>(sink);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/arrow/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/writer.h b/src/parquet/arrow/writer.h
index 4a39c99..e3b281b 100644
--- a/src/parquet/arrow/writer.h
+++ b/src/parquet/arrow/writer.h
@@ -50,8 +50,7 @@ class PARQUET_EXPORT FileWriter {
   FileWriter(::arrow::MemoryPool* pool, std::unique_ptr<ParquetFileWriter> writer);
 
   ::arrow::Status NewRowGroup(int64_t chunk_size);
-  ::arrow::Status WriteColumnChunk(
-      const ::arrow::Array* data, int64_t offset = 0, int64_t length = -1);
+  ::arrow::Status WriteColumnChunk(const ::arrow::Array& data);
   ::arrow::Status Close();
 
   virtual ~FileWriter();
@@ -68,12 +67,12 @@ class PARQUET_EXPORT FileWriter {
  *
  * The table shall only consist of columns of primitive type or of primitive lists.
  */
-::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table* table,
+::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table,
     ::arrow::MemoryPool* pool, const std::shared_ptr<OutputStream>& sink,
     int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
 
-::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table* table,
+::arrow::Status PARQUET_EXPORT WriteTable(const ::arrow::Table& table,
     ::arrow::MemoryPool* pool, const std::shared_ptr<::arrow::io::OutputStream>&
sink,
     int64_t chunk_size,
     const std::shared_ptr<WriterProperties>& properties = default_writer_properties());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index 19a7c49..dedb2c2 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -244,7 +244,8 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type
compressio
 }
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    BooleanType, ByteArrayType, FLBAType> TestTypes;
+    BooleanType, ByteArrayType, FLBAType>
+    TestTypes;
 
 TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/column/scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner-test.cc b/src/parquet/column/scanner-test.cc
index fa31e62..5d137b7 100644
--- a/src/parquet/column/scanner-test.cc
+++ b/src/parquet/column/scanner-test.cc
@@ -145,7 +145,8 @@ static int num_pages = 20;
 static int batch_size = 32;
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    ByteArrayType> TestTypes;
+    ByteArrayType>
+    TestTypes;
 
 using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
 using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/encoding-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index ad4a78f..67cd7ba 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -26,12 +26,12 @@
 
 #include <arrow/util/bit-util.h>
 
-#include "parquet/exception.h"
 #include "parquet/encoding.h"
+#include "parquet/exception.h"
 #include "parquet/schema.h"
 #include "parquet/types.h"
-#include "parquet/util/bit-util.h"
 #include "parquet/util/bit-stream-utils.inline.h"
+#include "parquet/util/bit-util.h"
 #include "parquet/util/cpu-info.h"
 #include "parquet/util/hash-util.h"
 #include "parquet/util/memory.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b2c734b8/src/parquet/util/bit-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h
index d1e81a3..56d6c03 100644
--- a/src/parquet/util/bit-util.h
+++ b/src/parquet/util/bit-util.h
@@ -32,9 +32,9 @@
 
 namespace parquet {
 
-#define INIT_BITSET(valid_bits_vector, valid_bits_index)      \
-  int byte_offset_##valid_bits_vector = valid_bits_index / 8; \
-  int bit_offset_##valid_bits_vector = valid_bits_index % 8;  \
+#define INIT_BITSET(valid_bits_vector, valid_bits_index)        \
+  int byte_offset_##valid_bits_vector = (valid_bits_index) / 8; \
+  int bit_offset_##valid_bits_vector = (valid_bits_index) % 8;  \
   uint8_t bitset_##valid_bits_vector = valid_bits_vector[byte_offset_##valid_bits_vector];
 
 #define READ_NEXT_BITSET(valid_bits_vector)                                          \


Mime
View raw message