arrow-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject arrow git commit: ARROW-201: [C++] Initial ParquetWriter implementation
Date Wed, 18 May 2016 17:51:08 GMT
Repository: arrow
Updated Branches:
  refs/heads/master 978de1a94 -> e0fb3698e


ARROW-201: [C++] Initial ParquetWriter implementation

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #78 from xhochy/arrow-201 and squashes the following commits:

5d95099 [Uwe L. Korn] Add check for flat column
88ae3ca [Uwe L. Korn] Install arrow_parquet headers
f81021b [Uwe L. Korn] Incorporate reader comments
ba240e8 [Uwe L. Korn] Incorporate writer comments
2179c0e [Uwe L. Korn] Infer c-type from ArrowType
efd46fb [Uwe L. Korn] Infer c-type from ArrowType
77386ea [Uwe L. Korn] Templatize test functions
1aa7698 [Uwe L. Korn] Add comment to helper function
8fdd4c8 [Uwe L. Korn] Parameterize schema creation
8e8d7d7 [Uwe L. Korn] ARROW-201: [C++] Initial ParquetWriter implementation


Project: http://git-wip-us.apache.org/repos/asf/arrow/repo
Commit: http://git-wip-us.apache.org/repos/asf/arrow/commit/e0fb3698
Tree: http://git-wip-us.apache.org/repos/asf/arrow/tree/e0fb3698
Diff: http://git-wip-us.apache.org/repos/asf/arrow/diff/e0fb3698

Branch: refs/heads/master
Commit: e0fb3698e5602bccaee232d4c259b3df089886e6
Parents: 978de1a
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Wed May 18 10:49:04 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Wed May 18 10:49:04 2016 -0700

----------------------------------------------------------------------
 cpp/src/arrow/parquet/CMakeLists.txt         |   6 +-
 cpp/src/arrow/parquet/parquet-io-test.cc     | 222 ++++++++++++++++++++++
 cpp/src/arrow/parquet/parquet-reader-test.cc | 116 -----------
 cpp/src/arrow/parquet/reader.cc              |  79 +++++---
 cpp/src/arrow/parquet/writer.cc              | 148 +++++++++++++++
 cpp/src/arrow/parquet/writer.h               |  59 ++++++
 6 files changed, 485 insertions(+), 145 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/CMakeLists.txt b/cpp/src/arrow/parquet/CMakeLists.txt
index cd6f05d..c00cc9f 100644
--- a/cpp/src/arrow/parquet/CMakeLists.txt
+++ b/cpp/src/arrow/parquet/CMakeLists.txt
@@ -21,6 +21,7 @@
 set(PARQUET_SRCS
   reader.cc
   schema.cc
+  writer.cc
 )
 
 set(PARQUET_LIBS
@@ -37,14 +38,15 @@ SET_TARGET_PROPERTIES(arrow_parquet PROPERTIES LINKER_LANGUAGE CXX)
 ADD_ARROW_TEST(parquet-schema-test)
 ARROW_TEST_LINK_LIBRARIES(parquet-schema-test arrow_parquet)
 
-ADD_ARROW_TEST(parquet-reader-test)
-ARROW_TEST_LINK_LIBRARIES(parquet-reader-test arrow_parquet)
+ADD_ARROW_TEST(parquet-io-test)
+ARROW_TEST_LINK_LIBRARIES(parquet-io-test arrow_parquet)
 
 # Headers: top level
 install(FILES
   reader.h
   schema.h
   utils.h
+  writer.h
   DESTINATION include/arrow/parquet)
 
 install(TARGETS arrow_parquet

http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/parquet-io-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-io-test.cc b/cpp/src/arrow/parquet/parquet-io-test.cc
new file mode 100644
index 0000000..845574d
--- /dev/null
+++ b/cpp/src/arrow/parquet/parquet-io-test.cc
@@ -0,0 +1,222 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "gtest/gtest.h"
+
+#include "arrow/test-util.h"
+#include "arrow/parquet/reader.h"
+#include "arrow/parquet/writer.h"
+#include "arrow/types/primitive.h"
+#include "arrow/util/memory-pool.h"
+#include "arrow/util/status.h"
+
+#include "parquet/api/reader.h"
+#include "parquet/api/writer.h"
+
+using ParquetBuffer = parquet::Buffer;
+using parquet::BufferReader;
+using parquet::InMemoryOutputStream;
+using parquet::ParquetFileReader;
+using parquet::ParquetFileWriter;
+using parquet::RandomAccessSource;
+using parquet::Repetition;
+using parquet::SchemaDescriptor;
+using ParquetType = parquet::Type;
+using parquet::schema::GroupNode;
+using parquet::schema::NodePtr;
+using parquet::schema::PrimitiveNode;
+
+namespace arrow {
+
+namespace parquet {
+
+template <typename ArrowType>
+std::shared_ptr<PrimitiveArray> NonNullArray(
+    size_t size, typename ArrowType::c_type value) {
+  std::vector<typename ArrowType::c_type> values(size, value);
+  NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size());
+  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+// This helper function only supports (size/2) nulls yet.
+template <typename ArrowType>
+std::shared_ptr<PrimitiveArray> NullableArray(
+    size_t size, typename ArrowType::c_type value, size_t num_nulls) {
+  std::vector<typename ArrowType::c_type> values(size, value);
+  std::vector<uint8_t> valid_bytes(size, 1);
+
+  for (size_t i = 0; i < num_nulls; i++) {
+    valid_bytes[i * 2] = 0;
+  }
+
+  NumericBuilder<ArrowType> builder(default_memory_pool(), std::make_shared<ArrowType>());
+  builder.Append(values.data(), values.size(), valid_bytes.data());
+  return std::static_pointer_cast<PrimitiveArray>(builder.Finish());
+}
+
+class TestParquetIO : public ::testing::Test {
+ public:
+  virtual void SetUp() {}
+
+  std::shared_ptr<GroupNode> Schema(
+      ParquetType::type parquet_type, Repetition::type repetition) {
+    auto pnode = PrimitiveNode::Make("column1", repetition, parquet_type);
+    NodePtr node_ =
+        GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
+    return std::static_pointer_cast<GroupNode>(node_);
+  }
+
+  std::unique_ptr<ParquetFileWriter> MakeWriter(std::shared_ptr<GroupNode>&
schema) {
+    sink_ = std::make_shared<InMemoryOutputStream>();
+    return ParquetFileWriter::Open(sink_, schema);
+  }
+
+  std::unique_ptr<ParquetFileReader> ReaderFromSink() {
+    std::shared_ptr<ParquetBuffer> buffer = sink_->GetBuffer();
+    std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+    return ParquetFileReader::Open(std::move(source));
+  }
+
+  void ReadSingleColumnFile(
+      std::unique_ptr<ParquetFileReader> file_reader, std::shared_ptr<Array>*
out) {
+    arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
+    std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
+    ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
+    ASSERT_NE(nullptr, column_reader.get());
+    ASSERT_OK(column_reader->NextBatch(100, out));
+    ASSERT_NE(nullptr, out->get());
+  }
+
+  std::unique_ptr<ParquetFileReader> Int64File(
+      std::vector<int64_t>& values, int num_chunks) {
+    std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
+    std::unique_ptr<ParquetFileWriter> file_writer = MakeWriter(schema);
+    size_t chunk_size = values.size() / num_chunks;
+    for (int i = 0; i < num_chunks; i++) {
+      auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
+      auto column_writer =
+          static_cast<::parquet::Int64Writer*>(row_group_writer->NextColumn());
+      int64_t* data = values.data() + i * chunk_size;
+      column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
+      column_writer->Close();
+      row_group_writer->Close();
+    }
+    file_writer->Close();
+    return ReaderFromSink();
+  }
+
+ private:
+  std::shared_ptr<InMemoryOutputStream> sink_;
+};
+
+TEST_F(TestParquetIO, SingleColumnInt64Read) {
+  std::vector<int64_t> values(100, 128);
+  std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1);
+
+  std::shared_ptr<Array> out;
+  ReadSingleColumnFile(std::move(file_reader), &out);
+
+  Int64Array* out_array = static_cast<Int64Array*>(out.get());
+  for (size_t i = 0; i < values.size(); i++) {
+    EXPECT_EQ(values[i], out_array->raw_data()[i]);
+  }
+}
+
+TEST_F(TestParquetIO, SingleColumnInt64ChunkedRead) {
+  std::vector<int64_t> values(100, 128);
+  std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4);
+
+  std::shared_ptr<Array> out;
+  ReadSingleColumnFile(std::move(file_reader), &out);
+
+  Int64Array* out_array = static_cast<Int64Array*>(out.get());
+  for (size_t i = 0; i < values.size(); i++) {
+    EXPECT_EQ(values[i], out_array->raw_data()[i]);
+  }
+}
+
+TEST_F(TestParquetIO, SingleColumnInt64Write) {
+  std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128);
+
+  std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
+  FileWriter writer(default_memory_pool(), MakeWriter(schema));
+  ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
+  ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
+  ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+
+  std::shared_ptr<Array> out;
+  ReadSingleColumnFile(ReaderFromSink(), &out);
+  ASSERT_TRUE(values->Equals(out));
+}
+
+TEST_F(TestParquetIO, SingleColumnDoubleReadWrite) {
+  // This also tests max_definition_level = 1
+  std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128,
10);
+
+  std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL);
+  FileWriter writer(default_memory_pool(), MakeWriter(schema));
+  ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values->length())));
+  ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values.get())));
+  ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+
+  std::shared_ptr<Array> out;
+  ReadSingleColumnFile(ReaderFromSink(), &out);
+  ASSERT_TRUE(values->Equals(out));
+}
+
+TEST_F(TestParquetIO, SingleColumnInt64ChunkedWrite) {
+  std::shared_ptr<PrimitiveArray> values = NonNullArray<Int64Type>(100, 128);
+  std::shared_ptr<PrimitiveArray> values_chunk = NonNullArray<Int64Type>(25,
128);
+
+  std::shared_ptr<GroupNode> schema = Schema(ParquetType::INT64, Repetition::REQUIRED);
+  FileWriter writer(default_memory_pool(), MakeWriter(schema));
+  for (int i = 0; i < 4; i++) {
+    ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
+    ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
+  }
+  ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+
+  std::shared_ptr<Array> out;
+  ReadSingleColumnFile(ReaderFromSink(), &out);
+  ASSERT_TRUE(values->Equals(out));
+}
+
+TEST_F(TestParquetIO, SingleColumnDoubleChunkedWrite) {
+  std::shared_ptr<PrimitiveArray> values = NullableArray<DoubleType>(100, 128,
10);
+  std::shared_ptr<PrimitiveArray> values_chunk_nulls =
+      NullableArray<DoubleType>(25, 128, 10);
+  std::shared_ptr<PrimitiveArray> values_chunk = NullableArray<DoubleType>(25,
128, 0);
+
+  std::shared_ptr<GroupNode> schema = Schema(ParquetType::DOUBLE, Repetition::OPTIONAL);
+  FileWriter writer(default_memory_pool(), MakeWriter(schema));
+  ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk_nulls->length())));
+  ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk_nulls.get())));
+  for (int i = 0; i < 3; i++) {
+    ASSERT_NO_THROW(ASSERT_OK(writer.NewRowGroup(values_chunk->length())));
+    ASSERT_NO_THROW(ASSERT_OK(writer.WriteFlatColumnChunk(values_chunk.get())));
+  }
+  ASSERT_NO_THROW(ASSERT_OK(writer.Close()));
+
+  std::shared_ptr<Array> out;
+  ReadSingleColumnFile(ReaderFromSink(), &out);
+  ASSERT_TRUE(values->Equals(out));
+}
+
+}  // namespace parquet
+
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/parquet-reader-test.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/parquet-reader-test.cc b/cpp/src/arrow/parquet/parquet-reader-test.cc
deleted file mode 100644
index a7fc2a8..0000000
--- a/cpp/src/arrow/parquet/parquet-reader-test.cc
+++ /dev/null
@@ -1,116 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "gtest/gtest.h"
-
-#include "arrow/test-util.h"
-#include "arrow/parquet/reader.h"
-#include "arrow/types/primitive.h"
-#include "arrow/util/memory-pool.h"
-#include "arrow/util/status.h"
-
-#include "parquet/api/reader.h"
-#include "parquet/api/writer.h"
-
-using ParquetBuffer = parquet::Buffer;
-using parquet::BufferReader;
-using parquet::InMemoryOutputStream;
-using parquet::Int64Writer;
-using parquet::ParquetFileReader;
-using parquet::ParquetFileWriter;
-using parquet::RandomAccessSource;
-using parquet::Repetition;
-using parquet::SchemaDescriptor;
-using ParquetType = parquet::Type;
-using parquet::schema::GroupNode;
-using parquet::schema::NodePtr;
-using parquet::schema::PrimitiveNode;
-
-namespace arrow {
-
-namespace parquet {
-
-class TestReadParquet : public ::testing::Test {
- public:
-  virtual void SetUp() {}
-
-  std::shared_ptr<GroupNode> Int64Schema() {
-    auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, ParquetType::INT64);
-    NodePtr node_ =
-        GroupNode::Make("schema", Repetition::REQUIRED, std::vector<NodePtr>({pnode}));
-    return std::static_pointer_cast<GroupNode>(node_);
-  }
-
-  std::unique_ptr<ParquetFileReader> Int64File(
-      std::vector<int64_t>& values, int num_chunks) {
-    std::shared_ptr<GroupNode> schema = Int64Schema();
-    std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
-    auto file_writer = ParquetFileWriter::Open(sink, schema);
-    size_t chunk_size = values.size() / num_chunks;
-    for (int i = 0; i < num_chunks; i++) {
-      auto row_group_writer = file_writer->AppendRowGroup(chunk_size);
-      auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
-      int64_t* data = values.data() + i * chunk_size;
-      column_writer->WriteBatch(chunk_size, nullptr, nullptr, data);
-      column_writer->Close();
-      row_group_writer->Close();
-    }
-    file_writer->Close();
-
-    std::shared_ptr<ParquetBuffer> buffer = sink->GetBuffer();
-    std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
-    return ParquetFileReader::Open(std::move(source));
-  }
-
- private:
-};
-
-TEST_F(TestReadParquet, SingleColumnInt64) {
-  std::vector<int64_t> values(100, 128);
-  std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 1);
-  arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
-  std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
-  ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
-  ASSERT_NE(nullptr, column_reader.get());
-  std::shared_ptr<Array> out;
-  ASSERT_OK(column_reader->NextBatch(100, &out));
-  ASSERT_NE(nullptr, out.get());
-  Int64Array* out_array = static_cast<Int64Array*>(out.get());
-  for (size_t i = 0; i < values.size(); i++) {
-    EXPECT_EQ(values[i], out_array->raw_data()[i]);
-  }
-}
-
-TEST_F(TestReadParquet, SingleColumnInt64Chunked) {
-  std::vector<int64_t> values(100, 128);
-  std::unique_ptr<ParquetFileReader> file_reader = Int64File(values, 4);
-  arrow::parquet::FileReader reader(default_memory_pool(), std::move(file_reader));
-  std::unique_ptr<arrow::parquet::FlatColumnReader> column_reader;
-  ASSERT_NO_THROW(ASSERT_OK(reader.GetFlatColumn(0, &column_reader)));
-  ASSERT_NE(nullptr, column_reader.get());
-  std::shared_ptr<Array> out;
-  ASSERT_OK(column_reader->NextBatch(100, &out));
-  ASSERT_NE(nullptr, out.get());
-  Int64Array* out_array = static_cast<Int64Array*>(out.get());
-  for (size_t i = 0; i < values.size(); i++) {
-    EXPECT_EQ(values[i], out_array->raw_data()[i]);
-  }
-}
-
-}  // namespace parquet
-
-}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/reader.cc b/cpp/src/arrow/parquet/reader.cc
index 481ded5..346de25 100644
--- a/cpp/src/arrow/parquet/reader.cc
+++ b/cpp/src/arrow/parquet/reader.cc
@@ -26,6 +26,7 @@
 #include "arrow/util/status.h"
 
 using parquet::ColumnReader;
+using parquet::Repetition;
 using parquet::TypedColumnReader;
 
 namespace arrow {
@@ -36,6 +37,7 @@ class FileReader::Impl {
   Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader);
   virtual ~Impl() {}
 
+  bool CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr);
   Status GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out);
   Status ReadFlatColumn(int i, std::shared_ptr<Array>* out);
 
@@ -51,7 +53,7 @@ class FlatColumnReader::Impl {
   virtual ~Impl() {}
 
   Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
-  template <typename ArrowType, typename ParquetType, typename CType>
+  template <typename ArrowType, typename ParquetType>
   Status TypedReadBatch(int batch_size, std::shared_ptr<Array>* out);
 
  private:
@@ -67,14 +69,28 @@ class FlatColumnReader::Impl {
 
   PoolBuffer values_buffer_;
   PoolBuffer def_levels_buffer_;
-  PoolBuffer rep_levels_buffer_;
+  PoolBuffer values_builder_buffer_;
+  PoolBuffer valid_bytes_buffer_;
 };
 
 FileReader::Impl::Impl(
     MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileReader> reader)
     : pool_(pool), reader_(std::move(reader)) {}
 
+bool FileReader::Impl::CheckForFlatColumn(const ::parquet::ColumnDescriptor* descr) {
+  if ((descr->max_repetition_level() > 0) || (descr->max_definition_level() >
1)) {
+    return false;
+  } else if ((descr->max_definition_level() == 1) &&
+             (descr->schema_node()->repetition() != Repetition::OPTIONAL)) {
+    return false;
+  }
+  return true;
+}
+
 Status FileReader::Impl::GetFlatColumn(int i, std::unique_ptr<FlatColumnReader>* out)
{
+  if (!CheckForFlatColumn(reader_->descr()->Column(i))) {
+    return Status::Invalid("The requested column is not flat");
+  }
   std::unique_ptr<FlatColumnReader::Impl> impl(
       new FlatColumnReader::Impl(pool_, reader_->descr()->Column(i), reader_.get(),
i));
   *out = std::unique_ptr<FlatColumnReader>(new FlatColumnReader(std::move(impl)));
@@ -109,37 +125,50 @@ FlatColumnReader::Impl::Impl(MemoryPool* pool, const ::parquet::ColumnDescriptor
       column_index_(column_index),
       next_row_group_(0),
       values_buffer_(pool),
-      def_levels_buffer_(pool),
-      rep_levels_buffer_(pool) {
+      def_levels_buffer_(pool) {
   NodeToField(descr_->schema_node(), &field_);
   NextRowGroup();
 }
 
-template <typename ArrowType, typename ParquetType, typename CType>
+template <typename ArrowType, typename ParquetType>
 Status FlatColumnReader::Impl::TypedReadBatch(
     int batch_size, std::shared_ptr<Array>* out) {
   int values_to_read = batch_size;
   NumericBuilder<ArrowType> builder(pool_, field_->type);
   while ((values_to_read > 0) && column_reader_) {
-    values_buffer_.Resize(values_to_read * sizeof(CType));
+    values_buffer_.Resize(values_to_read * sizeof(typename ParquetType::c_type));
     if (descr_->max_definition_level() > 0) {
       def_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
     }
-    if (descr_->max_repetition_level() > 0) {
-      rep_levels_buffer_.Resize(values_to_read * sizeof(int16_t));
-    }
     auto reader = dynamic_cast<TypedColumnReader<ParquetType>*>(column_reader_.get());
     int64_t values_read;
-    CType* values = reinterpret_cast<CType*>(values_buffer_.mutable_data());
-    PARQUET_CATCH_NOT_OK(
-        values_to_read -= reader->ReadBatch(values_to_read,
-            reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data()),
-            reinterpret_cast<int16_t*>(rep_levels_buffer_.mutable_data()), values,
-            &values_read));
+    int64_t levels_read;
+    int16_t* def_levels = reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    auto values =
+        reinterpret_cast<typename ParquetType::c_type*>(values_buffer_.mutable_data());
+    PARQUET_CATCH_NOT_OK(levels_read = reader->ReadBatch(
+                             values_to_read, def_levels, nullptr, values, &values_read));
+    values_to_read -= levels_read;
     if (descr_->max_definition_level() == 0) {
       RETURN_NOT_OK(builder.Append(values, values_read));
     } else {
-      return Status::NotImplemented("no support for definition levels yet");
+      // descr_->max_definition_level() == 1
+      RETURN_NOT_OK(values_builder_buffer_.Resize(
+          levels_read * sizeof(typename ParquetType::c_type)));
+      RETURN_NOT_OK(valid_bytes_buffer_.Resize(levels_read * sizeof(uint8_t)));
+      auto values_ptr = reinterpret_cast<typename ParquetType::c_type*>(
+          values_builder_buffer_.mutable_data());
+      uint8_t* valid_bytes = valid_bytes_buffer_.mutable_data();
+      int values_idx = 0;
+      for (int64_t i = 0; i < levels_read; i++) {
+        if (def_levels[i] < descr_->max_definition_level()) {
+          valid_bytes[i] = 0;
+        } else {
+          valid_bytes[i] = 1;
+          values_ptr[i] = values[values_idx++];
+        }
+      }
+      builder.Append(values_ptr, levels_read, valid_bytes);
     }
     if (!column_reader_->HasNext()) { NextRowGroup(); }
   }
@@ -147,9 +176,9 @@ Status FlatColumnReader::Impl::TypedReadBatch(
   return Status::OK();
 }
 
-#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType, CType)              \
-  case Type::ENUM:                                                         \
-    return TypedReadBatch<ArrowType, ParquetType, CType>(batch_size, out); \
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType)              \
+  case Type::ENUM:                                                  \
+    return TypedReadBatch<ArrowType, ParquetType>(batch_size, out); \
     break;
 
 Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out)
{
@@ -159,15 +188,11 @@ Status FlatColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>*
     return Status::OK();
   }
 
-  if (descr_->max_repetition_level() > 0) {
-    return Status::NotImplemented("no support for repetition yet");
-  }
-
   switch (field_->type->type) {
-    TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type, int32_t)
-    TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type, int64_t)
-    TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType, float)
-    TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType, double)
+    TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
+    TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
+    TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
     default:
       return Status::NotImplemented(field_->type->ToString());
   }

http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/writer.cc
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.cc b/cpp/src/arrow/parquet/writer.cc
new file mode 100644
index 0000000..3ad2c5b
--- /dev/null
+++ b/cpp/src/arrow/parquet/writer.cc
@@ -0,0 +1,148 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "arrow/parquet/writer.h"
+
+#include "arrow/array.h"
+#include "arrow/types/primitive.h"
+#include "arrow/parquet/utils.h"
+#include "arrow/util/status.h"
+
+namespace arrow {
+
+namespace parquet {
+
+class FileWriter::Impl {
+ public:
+  Impl(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
+
+  Status NewRowGroup(int64_t chunk_size);
+  template <typename ParquetType>
+  Status TypedWriteBatch(::parquet::ColumnWriter* writer, const PrimitiveArray* data);
+  Status WriteFlatColumnChunk(const PrimitiveArray* data);
+  Status Close();
+
+  virtual ~Impl() {}
+
+ private:
+  MemoryPool* pool_;
+  PoolBuffer data_buffer_;
+  PoolBuffer def_levels_buffer_;
+  std::unique_ptr<::parquet::ParquetFileWriter> writer_;
+  ::parquet::RowGroupWriter* row_group_writer_;
+};
+
+FileWriter::Impl::Impl(
+    MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer)
+    : pool_(pool),
+      data_buffer_(pool),
+      writer_(std::move(writer)),
+      row_group_writer_(nullptr) {}
+
+Status FileWriter::Impl::NewRowGroup(int64_t chunk_size) {
+  if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
+  PARQUET_CATCH_NOT_OK(row_group_writer_ = writer_->AppendRowGroup(chunk_size));
+  return Status::OK();
+}
+
+template <typename ParquetType>
+Status FileWriter::Impl::TypedWriteBatch(
+    ::parquet::ColumnWriter* column_writer, const PrimitiveArray* data) {
+  auto data_ptr =
+      reinterpret_cast<const typename ParquetType::c_type*>(data->data()->data());
+  auto writer =
+      reinterpret_cast<::parquet::TypedColumnWriter<ParquetType>*>(column_writer);
+  if (writer->descr()->max_definition_level() == 0) {
+    // no nulls, just dump the data
+    PARQUET_CATCH_NOT_OK(writer->WriteBatch(data->length(), nullptr, nullptr, data_ptr));
+  } else if (writer->descr()->max_definition_level() == 1) {
+    RETURN_NOT_OK(def_levels_buffer_.Resize(data->length() * sizeof(int16_t)));
+    int16_t* def_levels_ptr =
+        reinterpret_cast<int16_t*>(def_levels_buffer_.mutable_data());
+    if (data->null_count() == 0) {
+      std::fill(def_levels_ptr, def_levels_ptr + data->length(), 1);
+      PARQUET_CATCH_NOT_OK(
+          writer->WriteBatch(data->length(), def_levels_ptr, nullptr, data_ptr));
+    } else {
+      RETURN_NOT_OK(data_buffer_.Resize(
+          (data->length() - data->null_count()) * sizeof(typename ParquetType::c_type)));
+      auto buffer_ptr =
+          reinterpret_cast<typename ParquetType::c_type*>(data_buffer_.mutable_data());
+      int buffer_idx = 0;
+      for (size_t i = 0; i < data->length(); i++) {
+        if (data->IsNull(i)) {
+          def_levels_ptr[i] = 0;
+        } else {
+          def_levels_ptr[i] = 1;
+          buffer_ptr[buffer_idx++] = data_ptr[i];
+        }
+      }
+      PARQUET_CATCH_NOT_OK(
+          writer->WriteBatch(data->length(), def_levels_ptr, nullptr, buffer_ptr));
+    }
+  } else {
+    return Status::NotImplemented("no support for max definition level > 1 yet");
+  }
+  PARQUET_CATCH_NOT_OK(writer->Close());
+  return Status::OK();
+}
+
+Status FileWriter::Impl::Close() {
+  if (row_group_writer_ != nullptr) { PARQUET_CATCH_NOT_OK(row_group_writer_->Close());
}
+  PARQUET_CATCH_NOT_OK(writer_->Close());
+  return Status::OK();
+}
+
+#define TYPED_BATCH_CASE(ENUM, ArrowType, ParquetType) \
+  case Type::ENUM:                                     \
+    return TypedWriteBatch<ParquetType>(writer, data); \
+    break;
+
+Status FileWriter::Impl::WriteFlatColumnChunk(const PrimitiveArray* data) {
+  ::parquet::ColumnWriter* writer;
+  PARQUET_CATCH_NOT_OK(writer = row_group_writer_->NextColumn());
+  switch (data->type_enum()) {
+    TYPED_BATCH_CASE(INT32, Int32Type, ::parquet::Int32Type)
+    TYPED_BATCH_CASE(INT64, Int64Type, ::parquet::Int64Type)
+    TYPED_BATCH_CASE(FLOAT, FloatType, ::parquet::FloatType)
+    TYPED_BATCH_CASE(DOUBLE, DoubleType, ::parquet::DoubleType)
+    default:
+      return Status::NotImplemented(data->type()->ToString());
+  }
+}
+
+FileWriter::FileWriter(
+    MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer)
+    : impl_(new FileWriter::Impl(pool, std::move(writer))) {}
+
+Status FileWriter::NewRowGroup(int64_t chunk_size) {
+  return impl_->NewRowGroup(chunk_size);
+}
+
+Status FileWriter::WriteFlatColumnChunk(const PrimitiveArray* data) {
+  return impl_->WriteFlatColumnChunk(data);
+}
+
+Status FileWriter::Close() {
+  return impl_->Close();
+}
+
+FileWriter::~FileWriter() {}
+
+}  // namespace parquet
+
+}  // namespace arrow

http://git-wip-us.apache.org/repos/asf/arrow/blob/e0fb3698/cpp/src/arrow/parquet/writer.h
----------------------------------------------------------------------
diff --git a/cpp/src/arrow/parquet/writer.h b/cpp/src/arrow/parquet/writer.h
new file mode 100644
index 0000000..38f7d0b
--- /dev/null
+++ b/cpp/src/arrow/parquet/writer.h
@@ -0,0 +1,59 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef ARROW_PARQUET_WRITER_H
+#define ARROW_PARQUET_WRITER_H
+
+#include <memory>
+
+#include "parquet/api/schema.h"
+#include "parquet/api/writer.h"
+
+namespace arrow {
+
+class MemoryPool;
+class PrimitiveArray;
+class RowBatch;
+class Status;
+
+namespace parquet {
+
+/**
+ * Iterative API:
+ *  Start a new RowGroup/Chunk with NewRowGroup
+ *  Write column-by-column the whole column chunk
+ */
+class FileWriter {
+ public:
+  FileWriter(MemoryPool* pool, std::unique_ptr<::parquet::ParquetFileWriter> writer);
+
+  Status NewRowGroup(int64_t chunk_size);
+  Status WriteFlatColumnChunk(const PrimitiveArray* data);
+  Status Close();
+
+  virtual ~FileWriter();
+
+ private:
+  class Impl;
+  std::unique_ptr<Impl> impl_;
+};
+
+}  // namespace parquet
+
+}  // namespace arrow
+
+#endif  // ARROW_PARQUET_WRITER_H


Mime
View raw message