parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-436: Implement basic Write Support
Date Tue, 26 Apr 2016 17:43:35 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master aa31d025d -> 5fb7d20a9


PARQUET-436: Implement basic Write Support

Basic write support with only DataPage as page type, no compression and fixed page sizes. Also allocates and consumes more memory than the optimal solution. But I hope that it is in a state where we can review & merge, so that afterwards the work can be split up.

One outstanding design question for me is `WriteDataPage`. We could pass here a `DataPage` instance but would also need to take care of `num_rows` and `num_values` correctly for the thrift metadata, hoping for good suggestions here.

Also there are numerous TODOs in here for which we need to decide if they should be done as part of the PR or if we should open JIRAs for them.

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #90 from xhochy/parquet-436 and squashes the following commits:

ebfc3e0 [Uwe L. Korn] Address review comments
4a81afe [Uwe L. Korn] Address review comments
80af8ca [Uwe L. Korn] Address review comments
0cf2aa1 [Uwe L. Korn] Address code review comments
77da3c7 [Uwe L. Korn] Style fixes
afbb6da [Uwe L. Korn] Lint fixes
04fb0f2 [Uwe L. Korn] Add JIRA tickets to TODOs
02e0ba5 [Uwe L. Korn] More code cleanup
36b7d1d [Uwe L. Korn] Code cleanups
b581da1 [Uwe L. Korn] Get rid of num_nulls
f3ec4de [Uwe L. Korn] Correct order of definition and repetition levels
e306b04 [Uwe L. Korn] Running write unit test
ad351e8 [Uwe L. Korn] Added more tests for PageWriter
63c06f0 [Uwe L. Korn] Refactored unit tests
ebd9648 [Uwe L. Korn] We can now at least open a self-written file
7bedf34 [Uwe L. Korn] Lint fixes
aa84251 [Uwe L. Korn] Add support for writing repetition levels
b354e07 [Uwe L. Korn] Add support for writing definition levels
7298f96 [Uwe L. Korn] Lint fixes
9fe1d89 [Uwe L. Korn] Some unit test refactorings
9e21c56 [Uwe L. Korn] First version that can write a page
0662386 [Uwe L. Korn] First draft of write API


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/5fb7d20a
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/5fb7d20a
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/5fb7d20a

Branch: refs/heads/master
Commit: 5fb7d20a930543d9e3c41469b5c1d90a1723720b
Parents: aa31d02
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Tue Apr 26 13:43:23 2016 -0400
Committer: Wes McKinney <wesm@apache.org>
Committed: Tue Apr 26 13:43:23 2016 -0400

----------------------------------------------------------------------
 CMakeLists.txt                           |   3 +
 src/parquet/column/CMakeLists.txt        |   1 +
 src/parquet/column/column-writer-test.cc | 202 ++++++++++++++++++++
 src/parquet/column/page.h                |  14 ++
 src/parquet/column/reader.cc             |  12 ++
 src/parquet/column/reader.h              |   3 +-
 src/parquet/column/writer.cc             | 183 ++++++++++++++++++
 src/parquet/column/writer.h              | 206 ++++++++++++++++++++
 src/parquet/encodings/encoder.h          |   2 +
 src/parquet/encodings/plain-encoding.h   |   2 +-
 src/parquet/file/CMakeLists.txt          |   1 +
 src/parquet/file/file-serialize-test.cc  | 105 +++++++++++
 src/parquet/file/writer-internal.cc      | 262 ++++++++++++++++++++++++++
 src/parquet/file/writer-internal.h       | 145 ++++++++++++++
 src/parquet/file/writer.cc               |  83 ++++++++
 src/parquet/file/writer.h                | 167 ++++++++++++++++
 src/parquet/schema/types.cc              |   4 +
 src/parquet/schema/types.h               |   1 +
 18 files changed, 1394 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index f3207c0..7764cc0 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -317,10 +317,13 @@ endif()
 
 set(LIBPARQUET_SRCS
   src/parquet/column/reader.cc
+  src/parquet/column/writer.cc
   src/parquet/column/scanner.cc
 
   src/parquet/file/reader.cc
   src/parquet/file/reader-internal.cc
+  src/parquet/file/writer.cc
+  src/parquet/file/writer-internal.cc
 
   src/parquet/schema/converter.cc
   src/parquet/schema/descriptor.cc

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/column/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt
index fa1ce7a..e11c7a8 100644
--- a/src/parquet/column/CMakeLists.txt
+++ b/src/parquet/column/CMakeLists.txt
@@ -24,5 +24,6 @@ install(FILES
   DESTINATION include/parquet/column)
 
 ADD_PARQUET_TEST(column-reader-test)
+ADD_PARQUET_TEST(column-writer-test)
 ADD_PARQUET_TEST(levels-test)
 ADD_PARQUET_TEST(scanner-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
new file mode 100644
index 0000000..df5aa9a
--- /dev/null
+++ b/src/parquet/column/column-writer-test.cc
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "parquet/file/reader-internal.h"
+#include "parquet/file/writer-internal.h"
+#include "parquet/column/reader.h"
+#include "parquet/column/writer.h"
+#include "parquet/util/input.h"
+#include "parquet/util/output.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+using schema::NodePtr;
+using schema::PrimitiveNode;
+
+namespace test {
+
+class TestPrimitiveWriter : public ::testing::Test {
+ public:
+  void SetUpSchemaRequired() {
+    node_ = PrimitiveNode::Make("int64", Repetition::REQUIRED, Type::INT64);
+    schema_ = std::make_shared<ColumnDescriptor>(node_, 0, 0);
+  }
+
+  void SetUpSchemaOptional() {
+    node_ = PrimitiveNode::Make("int64", Repetition::OPTIONAL, Type::INT64);
+    schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 0);
+  }
+
+  void SetUpSchemaRepeated() {
+    node_ = PrimitiveNode::Make("int64", Repetition::REPEATED, Type::INT64);
+    schema_ = std::make_shared<ColumnDescriptor>(node_, 1, 1);
+  }
+
+  void SetUp() {
+    values_out_.resize(100);
+    definition_levels_out_.resize(100);
+    repetition_levels_out_.resize(100);
+
+    SetUpSchemaRequired();
+  }
+
+  std::unique_ptr<Int64Reader> BuildReader() {
+    auto buffer = sink_->GetBuffer();
+    std::unique_ptr<InMemoryInputStream> source(new InMemoryInputStream(buffer));
+    std::unique_ptr<SerializedPageReader> page_reader(
+        new SerializedPageReader(std::move(source), Compression::UNCOMPRESSED));
+    return std::unique_ptr<Int64Reader>(
+        new Int64Reader(schema_.get(), std::move(page_reader)));
+  }
+
+  std::unique_ptr<Int64Writer> BuildWriter(int64_t output_size = 100) {
+    sink_.reset(new InMemoryOutputStream());
+    std::unique_ptr<SerializedPageWriter> pager(
+        new SerializedPageWriter(sink_.get(), Compression::UNCOMPRESSED, &metadata_));
+    return std::unique_ptr<Int64Writer>(new Int64Writer(schema_.get(), std::move(pager),
+          output_size));
+  }
+
+  void ReadColumn() {
+    auto reader = BuildReader();
+    reader->ReadBatch(values_out_.size(), definition_levels_out_.data(),
+        repetition_levels_out_.data(), values_out_.data(), &values_read_);
+  }
+
+ protected:
+  int64_t values_read_;
+
+  // Output buffers
+  std::vector<int64_t> values_out_;
+  std::vector<int16_t> definition_levels_out_;
+  std::vector<int16_t> repetition_levels_out_;
+
+ private:
+  NodePtr node_;
+  format::ColumnChunk metadata_;
+  std::shared_ptr<ColumnDescriptor> schema_;
+  std::unique_ptr<InMemoryOutputStream> sink_;
+};
+
+TEST_F(TestPrimitiveWriter, RequiredNonRepeated) {
+  std::vector<int64_t> values(100, 128);
+
+  // Test case 1: required and non-repeated, so no definition or repetition levels
+  std::unique_ptr<Int64Writer> writer = BuildWriter();
+  writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
+  writer->Close();
+
+  ReadColumn();
+  ASSERT_EQ(100, values_read_);
+  ASSERT_EQ(values, values_out_);
+}
+
+TEST_F(TestPrimitiveWriter, OptionalNonRepeated) {
+  // Optional and non-repeated, with definition levels
+  // but no repetition levels
+  SetUpSchemaOptional();
+
+  std::vector<int64_t> values(100, 128);
+  std::vector<int16_t> definition_levels(100, 1);
+  definition_levels[1] = 0;
+
+  auto writer = BuildWriter();
+  writer->WriteBatch(values.size(), definition_levels.data(), nullptr, values.data());
+  writer->Close();
+
+  ReadColumn();
+  ASSERT_EQ(99, values_read_);
+  values_out_.resize(99);
+  values.resize(99);
+  ASSERT_EQ(values, values_out_);
+}
+
+TEST_F(TestPrimitiveWriter, OptionalRepeated) {
+  // Optional and repeated, so definition and repetition levels
+  SetUpSchemaRepeated();
+
+  std::vector<int64_t> values(100, 128);
+  std::vector<int16_t> definition_levels(100, 1);
+  definition_levels[1] = 0;
+  std::vector<int16_t> repetition_levels(100, 0);
+
+  auto writer = BuildWriter();
+  writer->WriteBatch(values.size(), definition_levels.data(),
+      repetition_levels.data(), values.data());
+  writer->Close();
+
+  ReadColumn();
+  ASSERT_EQ(99, values_read_);
+  values_out_.resize(99);
+  values.resize(99);
+  ASSERT_EQ(values, values_out_);
+}
+
+TEST_F(TestPrimitiveWriter, RequiredTooFewRows) {
+  std::vector<int64_t> values(99, 128);
+
+  auto writer = BuildWriter();
+  writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
+  ASSERT_THROW(writer->Close(), ParquetException);
+}
+
+TEST_F(TestPrimitiveWriter, RequiredTooMany) {
+  std::vector<int64_t> values(200, 128);
+
+  auto writer = BuildWriter();
+  ASSERT_THROW(writer->WriteBatch(values.size(), nullptr, nullptr, values.data()),
+      ParquetException);
+}
+
+TEST_F(TestPrimitiveWriter, OptionalRepeatedTooFewRows) {
+  // Optional and repeated, so definition and repetition levels
+  SetUpSchemaRepeated();
+
+  std::vector<int64_t> values(100, 128);
+  std::vector<int16_t> definition_levels(100, 1);
+  definition_levels[1] = 0;
+  std::vector<int16_t> repetition_levels(100, 0);
+  repetition_levels[3] = 1;
+
+  auto writer = BuildWriter();
+  writer->WriteBatch(values.size(), definition_levels.data(),
+      repetition_levels.data(), values.data());
+  ASSERT_THROW(writer->Close(), ParquetException);
+}
+
+TEST_F(TestPrimitiveWriter, RequiredNonRepeatedLargeChunk) {
+  std::vector<int64_t> values(10000, 128);
+
+  // Test case 1: required and non-repeated, so no definition or repetition levels
+  std::unique_ptr<Int64Writer> writer = BuildWriter(10000);
+  writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
+  writer->Close();
+
+  // Just read the first 100 to ensure we could read it back in
+  ReadColumn();
+  ASSERT_EQ(100, values_read_);
+  values.resize(100);
+  ASSERT_EQ(values, values_out_);
+}
+
+} // namespace test
+} // namespace parquet
+
+

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index 660f0ff..709f7c8 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -210,6 +210,20 @@ class PageReader {
   virtual std::shared_ptr<Page> NextPage() = 0;
 };
 
+class PageWriter {
+ public:
+  virtual ~PageWriter() {}
+
+  virtual void Close() = 0;
+
+  virtual int64_t WriteDataPage(int32_t num_rows, int32_t num_values,
+      const std::shared_ptr<Buffer>& definition_levels,
+      Encoding::type definition_level_encoding,
+      const std::shared_ptr<Buffer>& repetition_levels,
+      Encoding::type repetition_level_encoding,
+      const std::shared_ptr<Buffer>& values, Encoding::type encoding) = 0;
+};
+
 } // namespace parquet
 
 #endif // PARQUET_COLUMN_PAGE_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index a9158e7..f379348 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -224,4 +224,16 @@ std::shared_ptr<ColumnReader> ColumnReader::Make(
   return std::shared_ptr<ColumnReader>(nullptr);
 }
 
+// ----------------------------------------------------------------------
+// Instantiate templated classes
+
+template class TypedColumnReader<Type::BOOLEAN>;
+template class TypedColumnReader<Type::INT32>;
+template class TypedColumnReader<Type::INT64>;
+template class TypedColumnReader<Type::INT96>;
+template class TypedColumnReader<Type::FLOAT>;
+template class TypedColumnReader<Type::DOUBLE>;
+template class TypedColumnReader<Type::BYTE_ARRAY>;
+template class TypedColumnReader<Type::FIXED_LEN_BYTE_ARRAY>;
+
 } // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index e2b9fd7..0739e7e 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -108,7 +108,8 @@ class TypedColumnReader : public ColumnReader {
   typedef typename type_traits<TYPE>::value_type T;
 
   TypedColumnReader(const ColumnDescriptor* schema,
-      std::unique_ptr<PageReader> pager, MemoryAllocator* allocator) :
+      std::unique_ptr<PageReader> pager,
+      MemoryAllocator* allocator = default_allocator()) :
       ColumnReader(schema, std::move(pager), allocator),
       current_decoder_(NULL) {
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
new file mode 100644
index 0000000..f851316
--- /dev/null
+++ b/src/parquet/column/writer.cc
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column/writer.h"
+
+#include "parquet/encodings/plain-encoding.h"
+
+namespace parquet {
+
+// ----------------------------------------------------------------------
+// ColumnWriter
+
+ColumnWriter::ColumnWriter(const ColumnDescriptor* descr,
+    std::unique_ptr<PageWriter> pager, int64_t expected_rows,
+    MemoryAllocator* allocator):
+    descr_(descr), pager_(std::move(pager)), expected_rows_(expected_rows),
+    allocator_(allocator),
+    num_buffered_values_(0), num_buffered_encoded_values_(0),
+    num_rows_(0), total_bytes_written_(0) {
+  InitSinks();
+}
+
+void ColumnWriter::InitSinks() {
+  definition_levels_sink_.reset(new InMemoryOutputStream());
+  repetition_levels_sink_.reset(new InMemoryOutputStream());
+  values_sink_.reset(new InMemoryOutputStream());
+}
+
+void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, int16_t* levels) {
+  definition_levels_sink_->Write(reinterpret_cast<uint8_t*>(levels),
+      sizeof(int16_t) * num_levels);
+}
+
+void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, int16_t* levels) {
+  repetition_levels_sink_->Write(reinterpret_cast<uint8_t*>(levels),
+      sizeof(int16_t) * num_levels);
+}
+
+std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels(
+    const std::shared_ptr<Buffer>& buffer, int16_t max_level) {
+  // TODO: This only works with due to some RLE specifics
+  int64_t rle_size = 2 * num_buffered_values_ + sizeof(uint32_t);
+  auto buffer_rle = std::make_shared<OwnedMutableBuffer>(rle_size, allocator_);
+  level_encoder_.Init(Encoding::RLE, max_level,
+      num_buffered_values_, buffer_rle->mutable_data() + sizeof(uint32_t),
+      buffer_rle->size() - sizeof(uint32_t));
+  int encoded = level_encoder_.Encode(num_buffered_values_,
+      reinterpret_cast<const int16_t*>(buffer->data()));
+  DCHECK_EQ(encoded, num_buffered_values_);
+  reinterpret_cast<uint32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len();
+  int64_t encoded_size = level_encoder_.len() + sizeof(uint32_t);
+  DCHECK(rle_size >= encoded_size);
+  buffer_rle->Resize(encoded_size);
+  return std::static_pointer_cast<Buffer>(buffer_rle);
+}
+
+void ColumnWriter::WriteNewPage() {
+  // TODO: Currently we only support writing DataPages
+  std::shared_ptr<Buffer> definition_levels = definition_levels_sink_->GetBuffer();
+  std::shared_ptr<Buffer> repetition_levels = repetition_levels_sink_->GetBuffer();
+  std::shared_ptr<Buffer> values = values_sink_->GetBuffer();
+
+  if (descr_->max_definition_level() > 0) {
+    definition_levels = RleEncodeLevels(definition_levels,
+        descr_->max_definition_level());
+  }
+
+  if (descr_->max_repetition_level() > 0) {
+    repetition_levels = RleEncodeLevels(repetition_levels,
+        descr_->max_repetition_level());
+  }
+
+  // TODO(PARQUET-590): Encodings are hard-coded
+  int64_t bytes_written = pager_->WriteDataPage(num_buffered_values_,
+          num_buffered_encoded_values_,
+          definition_levels, Encoding::RLE,
+          repetition_levels, Encoding::RLE,
+          values, Encoding::PLAIN);
+  total_bytes_written_ += bytes_written;
+
+  // Re-initialize the sinks as GetBuffer made them invalid.
+  InitSinks();
+  num_buffered_values_ = 0;
+  num_buffered_encoded_values_ = 0;
+}
+
+int64_t ColumnWriter::Close() {
+  // Write all outstanding data to a new page
+  if (num_buffered_values_ > 0) {
+    WriteNewPage();
+  }
+
+  if (num_rows_ != expected_rows_) {
+    throw ParquetException("Less then the number of expected rows written in"
+        " the current column chunk");
+  }
+
+  pager_->Close();
+
+  return total_bytes_written_;
+}
+
+// ----------------------------------------------------------------------
+// TypedColumnWriter
+
+template <int TYPE>
+TypedColumnWriter<TYPE>::TypedColumnWriter(const ColumnDescriptor* schema,
+      std::unique_ptr<PageWriter> pager, int64_t expected_rows,
+      MemoryAllocator* allocator) :
+      ColumnWriter(schema, std::move(pager), expected_rows, allocator) {
+  // TODO(PARQUET-590) Get decoder type from WriterProperties
+  current_encoder_ = std::unique_ptr<EncoderType>(
+      new PlainEncoder<TYPE>(schema, allocator));
+}
+
+// ----------------------------------------------------------------------
+// Dynamic column writer constructor
+
+std::shared_ptr<ColumnWriter> ColumnWriter::Make(
+    const ColumnDescriptor* descr,
+    std::unique_ptr<PageWriter> pager,
+    int64_t expected_rows,
+    MemoryAllocator* allocator) {
+  switch (descr->physical_type()) {
+    case Type::BOOLEAN:
+      return std::make_shared<BoolWriter>(descr, std::move(pager), expected_rows,
+          allocator);
+    case Type::INT32:
+      return std::make_shared<Int32Writer>(descr, std::move(pager), expected_rows,
+          allocator);
+    case Type::INT64:
+      return std::make_shared<Int64Writer>(descr, std::move(pager), expected_rows,
+          allocator);
+    case Type::INT96:
+      return std::make_shared<Int96Writer>(descr, std::move(pager), expected_rows,
+          allocator);
+    case Type::FLOAT:
+      return std::make_shared<FloatWriter>(descr, std::move(pager), expected_rows,
+          allocator);
+    case Type::DOUBLE:
+      return std::make_shared<DoubleWriter>(descr, std::move(pager), expected_rows,
+          allocator);
+    case Type::BYTE_ARRAY:
+      return std::make_shared<ByteArrayWriter>(descr, std::move(pager), expected_rows,
+          allocator);
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::make_shared<FixedLenByteArrayWriter>(descr,
+          std::move(pager), expected_rows, allocator);
+    default:
+      ParquetException::NYI("type reader not implemented");
+  }
+  // Unreachable code, but supress compiler warning
+  return std::shared_ptr<ColumnWriter>(nullptr);
+}
+
+// ----------------------------------------------------------------------
+// Instantiate templated classes
+
+template class TypedColumnWriter<Type::BOOLEAN>;
+template class TypedColumnWriter<Type::INT32>;
+template class TypedColumnWriter<Type::INT64>;
+template class TypedColumnWriter<Type::INT96>;
+template class TypedColumnWriter<Type::FLOAT>;
+template class TypedColumnWriter<Type::DOUBLE>;
+template class TypedColumnWriter<Type::BYTE_ARRAY>;
+template class TypedColumnWriter<Type::FIXED_LEN_BYTE_ARRAY>;
+
+
+} // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
new file mode 100644
index 0000000..b9ea265
--- /dev/null
+++ b/src/parquet/column/writer.h
@@ -0,0 +1,206 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_WRITER_H
+#define PARQUET_COLUMN_WRITER_H
+
+#include "parquet/column/levels.h"
+#include "parquet/column/page.h"
+#include "parquet/encodings/encoder.h"
+#include "parquet/schema/descriptor.h"
+#include "parquet/types.h"
+#include "parquet/util/mem-allocator.h"
+#include "parquet/util/output.h"
+
+namespace parquet {
+
+class ColumnWriter {
+ public:
+  ColumnWriter(const ColumnDescriptor*, std::unique_ptr<PageWriter>,
+      int64_t expected_rows, MemoryAllocator* allocator = default_allocator());
+
+  static std::shared_ptr<ColumnWriter> Make(const ColumnDescriptor*,
+      std::unique_ptr<PageWriter>, int64_t expected_rows,
+      MemoryAllocator* allocator = default_allocator());
+
+  Type::type type() const {
+    return descr_->physical_type();
+  }
+
+  const ColumnDescriptor* descr() const {
+    return descr_;
+  }
+
+  /**
+   * Closes the ColumnWriter, commits any buffered values to pages.
+   *
+   * @return Total size of the column in bytes
+   */
+  int64_t Close();
+
+ protected:
+  void WriteNewPage();
+
+  // Write multiple definition levels
+  void WriteDefinitionLevels(int64_t num_levels, int16_t* levels);
+
+  // Write multiple repetition levels
+  void WriteRepetitionLevels(int64_t num_levels, int16_t* levels);
+
+  std::shared_ptr<Buffer> RleEncodeLevels(const std::shared_ptr<Buffer>& buffer,
+      int16_t max_level);
+
+  const ColumnDescriptor* descr_;
+
+  std::unique_ptr<PageWriter> pager_;
+
+  // The number of rows that should be written in this column chunk.
+  int64_t expected_rows_;
+
+  LevelEncoder level_encoder_;
+
+  MemoryAllocator* allocator_;
+
+  // The total number of values stored in the data page. This is the maximum of
+  // the number of encoded definition levels or encoded values. For
+  // non-repeated, required columns, this is equal to the number of encoded
+  // values. For repeated or optional values, there may be fewer data values
+  // than levels, and this tells you how many encoded levels there are in that
+  // case.
+  int num_buffered_values_;
+
+  // The total number of stored values. For repeated or optional values, this
+  // number may be lower than num_buffered_values_.
+  int num_buffered_encoded_values_;
+
+  // Total number of rows written with this ColumnWriter
+  int num_rows_;
+
+  int total_bytes_written_;
+
+  std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
+  std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
+  std::unique_ptr<InMemoryOutputStream> values_sink_;
+
+ private:
+  void InitSinks();
+};
+
+// API to write values to a single column. This is the main client facing API.
+template <int TYPE>
+class TypedColumnWriter : public ColumnWriter {
+ public:
+  typedef typename type_traits<TYPE>::value_type T;
+
+  TypedColumnWriter(const ColumnDescriptor* schema,
+      std::unique_ptr<PageWriter> pager, int64_t expected_rows,
+      MemoryAllocator* allocator = default_allocator());
+
+  // Write a batch of repetition levels, definition levels, and values to the
+  // column.
+  void WriteBatch(int64_t num_values, int16_t* def_levels, int16_t* rep_levels,
+      T* values);
+
+ private:
+  typedef Encoder<TYPE> EncoderType;
+
+  // Write values to a temporary buffer before they are encoded into pages
+  void WriteValues(int64_t num_values, T* values);
+
+  // Map of encoding type to the respective encoder object. For example, a
+  // column chunk's data pages may include both dictionary-encoded and
+  // plain-encoded data.
+  std::unordered_map<int, std::shared_ptr<EncoderType> > encoders_;
+
+  void ConfigureDictionary(const DictionaryPage* page);
+
+  std::unique_ptr<EncoderType> current_encoder_;
+};
+
+// TODO(PARQUET-591): This is just chosen at random, we should make better estimates.
+// See also: parquet-column/../column/impl/ColumnWriteStoreV2.java:sizeCheck
+const int64_t PAGE_VALUE_COUNT = 1000;
+
+template <int TYPE>
+inline void TypedColumnWriter<TYPE>::WriteBatch(int64_t num_values, int16_t* def_levels,
+    int16_t* rep_levels, T* values) {
+  int64_t values_to_write = 0;
+
+  // If the field is required and non-repeated, there are no definition levels
+  if (descr_->max_definition_level() > 0) {
+    for (int64_t i = 0; i < num_values; ++i) {
+      if (def_levels[i] == descr_->max_definition_level()) {
+        ++values_to_write;
+      }
+    }
+
+    WriteDefinitionLevels(num_values, def_levels);
+  } else {
+    // Required field, write all values
+    values_to_write = num_values;
+  }
+
+  // Not present for non-repeated fields
+  if (descr_->max_repetition_level() > 0) {
+    // A row could include more than one value
+    // Count the occasions where we start a new row
+    for (int64_t i = 0; i < num_values; ++i) {
+      if (rep_levels[i] == 0) {
+        num_rows_++;
+      }
+    }
+
+    WriteRepetitionLevels(num_values, rep_levels);
+  } else {
+    // Each value is exactly one row
+    num_rows_ += num_values;
+  }
+
+  if (num_rows_ > expected_rows_) {
+    throw ParquetException("More rows were written in the column chunk then expected");
+  }
+
+  WriteValues(values_to_write, values);
+
+  num_buffered_values_ += num_values;
+  num_buffered_encoded_values_ += values_to_write;
+
+  // TODO(PARQUET-591): Instead of rows as a boundary, do a size check
+  if (num_buffered_values_ >= PAGE_VALUE_COUNT) {
+    WriteNewPage();
+  }
+}
+
+template <int TYPE>
+void TypedColumnWriter<TYPE>::WriteValues(int64_t num_values, T* values) {
+  current_encoder_->Encode(values, num_values, values_sink_.get());
+}
+
+
+typedef TypedColumnWriter<Type::BOOLEAN> BoolWriter;
+typedef TypedColumnWriter<Type::INT32> Int32Writer;
+typedef TypedColumnWriter<Type::INT64> Int64Writer;
+typedef TypedColumnWriter<Type::INT96> Int96Writer;
+typedef TypedColumnWriter<Type::FLOAT> FloatWriter;
+typedef TypedColumnWriter<Type::DOUBLE> DoubleWriter;
+typedef TypedColumnWriter<Type::BYTE_ARRAY> ByteArrayWriter;
+typedef TypedColumnWriter<Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayWriter;
+
+} // namespace parquet
+
+#endif // PARQUET_COLUMN_READER_H
+

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index 6a962f7..d7af2f9 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -39,6 +39,8 @@ class Encoder {
 
   virtual ~Encoder() {}
 
+  virtual void Encode(const T* src, int num_values, OutputStream* dst) = 0;
+
   const Encoding::type encoding() const { return encoding_; }
 
  protected:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index e89c513..eee4463 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -177,7 +177,7 @@ class PlainEncoder : public Encoder<TYPE> {
       MemoryAllocator* allocator = default_allocator()) :
       Encoder<TYPE>(descr, Encoding::PLAIN, allocator) {}
 
-  void Encode(const T* src, int num_values, OutputStream* dst);
+  void Encode(const T* src, int num_values, OutputStream* dst) override;
 };
 
 template <>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/file/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt
index b7a65f1..682a392 100644
--- a/src/parquet/file/CMakeLists.txt
+++ b/src/parquet/file/CMakeLists.txt
@@ -20,3 +20,4 @@ install(FILES
   DESTINATION include/parquet/file)
 
 ADD_PARQUET_TEST(file-deserialize-test)
+ADD_PARQUET_TEST(file-serialize-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
new file mode 100644
index 0000000..a75d250
--- /dev/null
+++ b/src/parquet/file/file-serialize-test.cc
@@ -0,0 +1,105 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "parquet/column/reader.h"
+#include "parquet/column/writer.h"
+#include "parquet/file/reader.h"
+#include "parquet/file/writer.h"
+#include "parquet/types.h"
+#include "parquet/util/input.h"
+#include "parquet/util/output.h"
+
+namespace parquet {
+
+using schema::GroupNode;
+using schema::NodePtr;
+using schema::PrimitiveNode;
+
+namespace test {
+
+class TestSerialize : public ::testing::Test {
+ public:
+  void SetUpSchemaRequired() {
+    auto pnode = PrimitiveNode::Make("int64", Repetition::REQUIRED, Type::INT64);
+    node_ = GroupNode::Make("schema", Repetition::REQUIRED,
+        std::vector<NodePtr>({pnode}));
+    schema_.Init(node_);
+  }
+
+  void SetUpSchemaOptional() {
+    auto pnode = PrimitiveNode::Make("int64", Repetition::OPTIONAL, Type::INT64);
+    node_ = GroupNode::Make("schema", Repetition::REQUIRED,
+        std::vector<NodePtr>({pnode}));
+    schema_.Init(node_);
+  }
+
+  void SetUpSchemaRepeated() {
+    auto pnode = PrimitiveNode::Make("int64", Repetition::REPEATED, Type::INT64);
+    node_ = GroupNode::Make("schema", Repetition::REQUIRED,
+        std::vector<NodePtr>({pnode}));
+    schema_.Init(node_);
+  }
+
+  void SetUp() {
+    SetUpSchemaRequired();
+  }
+
+ protected:
+  NodePtr node_;
+  SchemaDescriptor schema_;
+};
+
+
+TEST_F(TestSerialize, SmallFile) {
+  std::shared_ptr<InMemoryOutputStream> sink(new InMemoryOutputStream());
+  auto gnode = std::static_pointer_cast<GroupNode>(node_);
+  auto file_writer = ParquetFileWriter::Open(sink, gnode);
+  auto row_group_writer = file_writer->AppendRowGroup(100);
+  auto column_writer = static_cast<Int64Writer*>(row_group_writer->NextColumn());
+  std::vector<int64_t> values(100, 128);
+  column_writer->WriteBatch(values.size(), nullptr, nullptr, values.data());
+  column_writer->Close();
+  row_group_writer->Close();
+  file_writer->Close();
+
+  auto buffer = sink->GetBuffer();
+  std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
+  auto file_reader = ParquetFileReader::Open(std::move(source));
+  ASSERT_EQ(1, file_reader->num_columns());
+  ASSERT_EQ(1, file_reader->num_row_groups());
+  ASSERT_EQ(100, file_reader->num_rows());
+
+  auto rg_reader = file_reader->RowGroup(0);
+  ASSERT_EQ(1, rg_reader->num_columns());
+  ASSERT_EQ(100, rg_reader->num_rows());
+
+  auto col_reader = std::static_pointer_cast<Int64Reader>(rg_reader->Column(0));
+  std::vector<int64_t> values_out(100);
+  std::vector<int16_t> def_levels_out(100);
+  std::vector<int16_t> rep_levels_out(100);
+  int64_t values_read;
+  col_reader->ReadBatch(values_out.size(), def_levels_out.data(), rep_levels_out.data(),
+      values_out.data(), &values_read);
+  ASSERT_EQ(100, values_read);
+  ASSERT_EQ(values, values_out);
+}
+
+} // namespace test
+
+} // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
new file mode 100644
index 0000000..f087752
--- /dev/null
+++ b/src/parquet/file/writer-internal.cc
@@ -0,0 +1,262 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file/writer-internal.h"
+
+#include "parquet/column/writer.h"
+#include "parquet/schema/converter.h"
+#include "parquet/thrift/util.h"
+#include "parquet/util/output.h"
+
+using parquet::schema::GroupNode;
+using parquet::schema::SchemaFlattener;
+
+namespace parquet {
+
+// FIXME: copied from reader-internal.cc
+static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+
+// ----------------------------------------------------------------------
+// SerializedPageWriter
+
+SerializedPageWriter::SerializedPageWriter(OutputStream* sink,
+        Compression::type codec, format::ColumnChunk* metadata,
+        MemoryAllocator* allocator) : sink_(sink), metadata_(metadata),
+        allocator_(allocator) {
+  compressor_ = Codec::Create(codec);
+  // Currently we directly start with the data page
+  metadata_->meta_data.__set_data_page_offset(sink_->Tell());
+  metadata_->meta_data.__set_codec(ToThrift(codec));
+}
+
+void SerializedPageWriter::Close() {}
+
+void SerializedPageWriter::AddEncoding(Encoding::type encoding) {
+  auto it = std::find(metadata_->meta_data.encodings.begin(),
+      metadata_->meta_data.encodings.end(), ToThrift(encoding));
+  if (it != metadata_->meta_data.encodings.end()) {
+    metadata_->meta_data.encodings.push_back(ToThrift(encoding));
+  }
+}
+
+int64_t SerializedPageWriter::WriteDataPage(int32_t num_rows, int32_t num_values,
+    const std::shared_ptr<Buffer>& definition_levels,
+    Encoding::type definition_level_encoding,
+    const std::shared_ptr<Buffer>& repetition_levels,
+    Encoding::type repetition_level_encoding,
+    const std::shared_ptr<Buffer>& values, Encoding::type encoding) {
+  int64_t uncompressed_size = definition_levels->size() + repetition_levels->size()
+    + values->size();
+
+  // Concatenate data into a single buffer
+  // TODO: In the uncompressed case, directly write this to the sink
+  // TODO: Reuse the (un)compressed_data buffer instead of recreating it each time.
+  std::shared_ptr<OwnedMutableBuffer> uncompressed_data =
+    std::make_shared<OwnedMutableBuffer>(uncompressed_size, allocator_);
+  uint8_t* uncompressed_ptr = uncompressed_data->mutable_data();
+  memcpy(uncompressed_ptr, repetition_levels->data(), repetition_levels->size());
+  uncompressed_ptr += repetition_levels->size();
+  memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size());
+  uncompressed_ptr += definition_levels->size();
+  memcpy(uncompressed_ptr, values->data(), values->size());
+
+  // Compress the data
+  int64_t compressed_size = uncompressed_size;
+  std::shared_ptr<OwnedMutableBuffer> compressed_data = uncompressed_data;
+  if (compressor_) {
+    // TODO(PARQUET-592): Add support for compression
+    // int64_t max_compressed_size = compressor_->MaxCompressedLen(
+    // uncompressed_data.size(), uncompressed_data.data());
+    // OwnedMutableBuffer compressed_data(compressor_->MaxCompressedLen(
+    // uncompressed_data.size(), uncompressed_data.data()));
+  }
+  // Compressed data is not needed anymore, so immediately get rid of it.
+  uncompressed_data.reset();
+
+  format::DataPageHeader data_page_header;
+  data_page_header.__set_num_values(num_rows);
+  data_page_header.__set_encoding(ToThrift(encoding));
+  data_page_header.__set_definition_level_encoding(ToThrift(definition_level_encoding));
+  data_page_header.__set_repetition_level_encoding(ToThrift(repetition_level_encoding));
+  // TODO(PARQUET-593) statistics
+
+  format::PageHeader page_header;
+  page_header.__set_type(format::PageType::DATA_PAGE);
+  page_header.__set_uncompressed_page_size(uncompressed_size);
+  page_header.__set_compressed_page_size(compressed_size);
+  page_header.__set_data_page_header(data_page_header);
+  // TODO(PARQUET-594) crc checksum
+
+  int64_t start_pos = sink_->Tell();
+  SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
+  int64_t header_size = sink_->Tell() - start_pos;
+  sink_->Write(compressed_data->data(), compressed_data->size());
+
+  metadata_->meta_data.total_uncompressed_size += uncompressed_size + header_size;
+  metadata_->meta_data.total_compressed_size += compressed_size + header_size;
+  metadata_->meta_data.num_values += num_values;
+
+  return sink_->Tell() - start_pos;
+}
+
+// ----------------------------------------------------------------------
+// RowGroupSerializer
+
+int RowGroupSerializer::num_columns() const {
+  return schema_->num_columns();
+}
+
+int64_t RowGroupSerializer::num_rows() const {
+  return num_rows_;
+}
+
+const SchemaDescriptor* RowGroupSerializer::schema() const  {
+  return schema_;
+}
+
+ColumnWriter* RowGroupSerializer::NextColumn() {
+  if (current_column_index_ == schema_->num_columns() - 1) {
+    throw ParquetException("All columns have already been written.");
+  }
+  current_column_index_++;
+
+  if (current_column_writer_) {
+    total_bytes_written_ += current_column_writer_->Close();
+  }
+
+  const ColumnDescriptor* column_descr = schema_->Column(current_column_index_);
+  format::ColumnChunk* col_meta = &metadata_->columns[current_column_index_];
+  col_meta->__isset.meta_data = true;
+  col_meta->meta_data.__set_type(ToThrift(column_descr->physical_type()));
+  col_meta->meta_data.__set_path_in_schema(column_descr->path()->ToDotVector());
+  std::unique_ptr<PageWriter> pager(new SerializedPageWriter(sink_,
+        Compression::UNCOMPRESSED, col_meta,
+        allocator_));
+  current_column_writer_ = ColumnWriter::Make(column_descr,
+      std::move(pager), num_rows_, allocator_);
+  return current_column_writer_.get();
+}
+
+void RowGroupSerializer::Close() {
+  if (current_column_index_ != schema_->num_columns() - 1) {
+    throw ParquetException("Not all column were written in the current rowgroup.");
+  }
+
+  if (current_column_writer_) {
+    total_bytes_written_ += current_column_writer_->Close();
+    current_column_writer_.reset();
+  }
+
+  metadata_->__set_total_byte_size(total_bytes_written_);
+}
+
+// ----------------------------------------------------------------------
+// FileSerializer
+
+std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open(
+    std::shared_ptr<OutputStream> sink, std::shared_ptr<GroupNode>& schema,
+    MemoryAllocator* allocator) {
+  std::unique_ptr<ParquetFileWriter::Contents> result(
+      new FileSerializer(sink, schema, allocator));
+
+  return result;
+}
+
+void FileSerializer::Close() {
+  if (row_group_writer_) {
+    row_group_writer_->Close();
+  }
+  row_group_writer_.reset();
+
+  // Write magic bytes and metadata
+  WriteMetaData();
+
+  sink_->Close();
+}
+
+int FileSerializer::num_columns() const {
+  return schema_.num_columns();
+}
+
+int FileSerializer::num_row_groups() const {
+  return num_row_groups_;
+}
+
+int64_t FileSerializer::num_rows() const {
+  return num_rows_;
+}
+
+RowGroupWriter* FileSerializer::AppendRowGroup(int64_t num_rows) {
+  if (row_group_writer_) {
+    row_group_writer_->Close();
+  }
+  num_rows_ += num_rows;
+  num_row_groups_++;
+
+  auto rgm_size = row_group_metadata_.size();
+  row_group_metadata_.resize(rgm_size + 1);
+  format::RowGroup* rg_metadata = &row_group_metadata_.data()[rgm_size];
+  std::unique_ptr<RowGroupWriter::Contents> contents(
+      new RowGroupSerializer(num_rows, &schema_, sink_.get(), rg_metadata, allocator_));
+  row_group_writer_.reset(new RowGroupWriter(std::move(contents), allocator_));
+  return row_group_writer_.get();
+}
+
+FileSerializer::~FileSerializer() {
+  Close();
+}
+
+void FileSerializer::WriteMetaData() {
+  // Write MetaData
+  uint32_t metadata_len = sink_->Tell();
+
+  SchemaFlattener flattener(static_cast<GroupNode*>(schema_.schema().get()),
+      &metadata_.schema);
+  flattener.Flatten();
+
+  // TODO: Currently we only write version 1 files
+  metadata_.__set_version(1);
+  metadata_.__set_num_rows(num_rows_);
+  metadata_.__set_row_groups(row_group_metadata_);
+  // TODO(PARQUET-595) Support key_value_metadata
+  // TODO(PARQUET-590) Get from WriterProperties
+  metadata_.__set_created_by("parquet-cpp");
+
+  SerializeThriftMsg(&metadata_, 1024, sink_.get());
+  metadata_len = sink_->Tell() - metadata_len;
+
+  // Write Footer
+  sink_->Write(reinterpret_cast<uint8_t*>(&metadata_len), 4);
+  sink_->Write(PARQUET_MAGIC, 4);
+}
+
+FileSerializer::FileSerializer(
+    std::shared_ptr<OutputStream> sink,
+    std::shared_ptr<GroupNode>& schema,
+    MemoryAllocator* allocator = default_allocator()) :
+        sink_(sink), allocator_(allocator),
+        num_row_groups_(0), num_rows_(0) {
+  schema_.Init(schema);
+  StartFile();
+}
+
+void FileSerializer::StartFile() {
+  // Parquet files always start with PAR1
+  sink_->Write(PARQUET_MAGIC, 4);
+}
+
+} // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
new file mode 100644
index 0000000..e88348b
--- /dev/null
+++ b/src/parquet/file/writer-internal.h
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_FILE_WRITER_INTERNAL_H
+#define PARQUET_FILE_WRITER_INTERNAL_H
+
+#include <memory>
+#include <vector>
+
+#include "parquet/column/page.h"
+#include "parquet/compression/codec.h"
+#include "parquet/file/writer.h"
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet {
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift format::PageHeader indicating the type of each page
+// and the page metadata.
+//
+// TODO: Currently only writes DataPage pages.
+class SerializedPageWriter : public PageWriter {
+ public:
+  SerializedPageWriter(OutputStream* sink,
+      Compression::type codec, format::ColumnChunk* metadata,
+      MemoryAllocator* allocator = default_allocator());
+
+  virtual ~SerializedPageWriter() {}
+
+  // TODO Refactor that this just takes a DataPage instance.
+  // For this we need to be clear how to handle num_rows and num_values
+  int64_t WriteDataPage(int32_t num_rows, int32_t num_values,
+      const std::shared_ptr<Buffer>& definition_levels,
+      Encoding::type definition_level_encoding,
+      const std::shared_ptr<Buffer>& repetition_levels,
+      Encoding::type repetition_level_encoding,
+      const std::shared_ptr<Buffer>& values, Encoding::type encoding) override;
+
+  void Close() override;
+
+ private:
+  OutputStream* sink_;
+  format::ColumnChunk* metadata_;
+  MemoryAllocator* allocator_;
+
+  // Compression codec to use.
+  std::unique_ptr<Codec> compressor_;
+  OwnedMutableBuffer compression_buffer_;
+
+  void AddEncoding(Encoding::type encoding);
+};
+
+// RowGroupWriter::Contents implementation for the Parquet file specification
+class RowGroupSerializer : public RowGroupWriter::Contents {
+ public:
+  RowGroupSerializer(int64_t num_rows,
+      const SchemaDescriptor* schema,
+      OutputStream* sink,
+      format::RowGroup* metadata,
+      MemoryAllocator* allocator) :
+      num_rows_(num_rows), schema_(schema), sink_(sink),
+      metadata_(metadata),
+      allocator_(allocator),
+      total_bytes_written_(0),
+      current_column_index_(-1) {
+    metadata_->__set_num_rows(num_rows_);
+    metadata_->columns.resize(schema->num_columns());
+  }
+
+  int num_columns() const override;
+  int64_t num_rows() const override;
+  const SchemaDescriptor* schema() const override;
+
+  // TODO: PARQUET-579
+  // void WriteRowGroupStatitics() override;
+
+  ColumnWriter* NextColumn() override;
+  void Close() override;
+
+ private:
+  int64_t num_rows_;
+  const SchemaDescriptor* schema_;
+  OutputStream* sink_;
+  format::RowGroup* metadata_;
+  MemoryAllocator* allocator_;
+  int64_t total_bytes_written_;
+
+  int64_t current_column_index_;
+  std::shared_ptr<ColumnWriter> current_column_writer_;
+};
+
+// An implementation of ParquetFileWriter::Contents that deals with the Parquet
+// file structure, Thrift serialization, and other internal matters
+
+class FileSerializer : public ParquetFileWriter::Contents {
+ public:
+  static std::unique_ptr<ParquetFileWriter::Contents> Open(
+      std::shared_ptr<OutputStream> sink,
+      std::shared_ptr<schema::GroupNode>& schema,
+      MemoryAllocator* allocator = default_allocator());
+
+  void Close() override;
+
+  RowGroupWriter* AppendRowGroup(int64_t num_rows) override;
+
+  int num_columns() const override;
+  int num_row_groups() const override;
+  int64_t num_rows() const override;
+
+  virtual ~FileSerializer();
+
+ private:
+  explicit FileSerializer(std::shared_ptr<OutputStream> sink,
+      std::shared_ptr<schema::GroupNode>& schema,
+      MemoryAllocator* allocator);
+
+  std::shared_ptr<OutputStream> sink_;
+  format::FileMetaData metadata_;
+  std::vector<format::RowGroup> row_group_metadata_;
+  MemoryAllocator* allocator_;
+  int num_row_groups_;
+  int num_rows_;
+  std::unique_ptr<RowGroupWriter> row_group_writer_;
+
+  void StartFile();
+  void WriteMetaData();
+};
+
+} // namespace parquet
+
+#endif // PARQUET_FILE_WRITER_INTERNAL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/file/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
new file mode 100644
index 0000000..6e9f4f9
--- /dev/null
+++ b/src/parquet/file/writer.cc
@@ -0,0 +1,83 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file/writer.h"
+
+#include "parquet/file/writer-internal.h"
+#include "parquet/util/output.h"
+
+using parquet::schema::GroupNode;
+
+namespace parquet {
+
+// ----------------------------------------------------------------------
+// RowGroupWriter public API
+
+RowGroupWriter::RowGroupWriter(std::unique_ptr<Contents> contents,
+    MemoryAllocator* allocator):
+  contents_(std::move(contents)), allocator_(allocator) {
+  schema_ = contents_->schema();
+}
+
+void RowGroupWriter::Close() {
+  if (contents_) {
+    contents_->Close();
+    contents_.reset();
+  }
+}
+
+ColumnWriter* RowGroupWriter::NextColumn() {
+  return contents_->NextColumn();
+}
+
+// ----------------------------------------------------------------------
+// ParquetFileWriter public API
+
+ParquetFileWriter::ParquetFileWriter() {}
+
+ParquetFileWriter::~ParquetFileWriter() {
+  Close();
+}
+
+std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
+    std::shared_ptr<OutputStream> sink, std::shared_ptr<GroupNode>& schema,
+    MemoryAllocator* allocator) {
+  auto contents = FileSerializer::Open(sink, schema, allocator);
+
+  std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
+  result->Open(std::move(contents));
+
+  return result;
+}
+
+void ParquetFileWriter::Open(std::unique_ptr<ParquetFileWriter::Contents> contents) {
+  contents_ = std::move(contents);
+  schema_ = contents_->schema();
+}
+
+void ParquetFileWriter::Close() {
+  if (contents_) {
+    contents_->Close();
+    contents_.reset();
+  }
+}
+
+RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) {
+  return contents_->AppendRowGroup(num_rows);
+}
+
+} // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
new file mode 100644
index 0000000..9ad2539
--- /dev/null
+++ b/src/parquet/file/writer.h
@@ -0,0 +1,167 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_FILE_WRITER_H
+#define PARQUET_FILE_WRITER_H
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/schema/descriptor.h"
+#include "parquet/schema/types.h"
+#include "parquet/util/mem-allocator.h"
+
+namespace parquet {
+
+class ColumnWriter;
+class PageWriter;
+class OutputStream;
+
+class RowGroupWriter {
+ public:
+  struct Contents {
+    virtual int num_columns() const = 0;
+    virtual int64_t num_rows() const = 0;
+
+    // TODO: PARQUET-579
+    // virtual void WriteRowGroupStatitics();
+    virtual ColumnWriter* NextColumn() = 0;
+    virtual void Close() = 0;
+
+    // Return const-pointer to make it clear that this object is not to be copied
+    virtual const SchemaDescriptor* schema() const = 0;
+  };
+
+  RowGroupWriter(std::unique_ptr<Contents> contents, MemoryAllocator* allocator);
+
+  /**
+   * Construct a ColumnWriter for the indicated row group-relative column.
+   *
+   * Ownership is solely within the RowGroupWriter. The ColumnWriter is only valid
+   * until the next call to NextColumn or Close. As the contents are directly written to
+   * the sink, once a new column is started, the contents of the previous one cannot be
+   * modified anymore.
+   */
+  ColumnWriter* NextColumn();
+  void Close();
+
+  int num_columns() const;
+
+  /**
+   * Number of rows that shall be written as part of this RowGroup.
+   */
+  int64_t num_rows() const;
+
+  // TODO: PARQUET-579
+  // virtual void WriteRowGroupStatitics();
+
+ private:
+  // Owned by the parent ParquetFileWriter
+  const SchemaDescriptor* schema_;
+
+  // This is declared in the .cc file so that we can hide compiled Thrift
+  // headers from the public API and also more easily create test fixtures.
+  std::unique_ptr<Contents> contents_;
+
+  MemoryAllocator* allocator_;
+};
+
+class ParquetFileWriter {
+ public:
+  struct Contents {
+    virtual ~Contents() {}
+    // Perform any cleanup associated with the file contents
+    virtual void Close() = 0;
+
+    virtual RowGroupWriter* AppendRowGroup(int64_t num_rows) = 0;
+
+    virtual int64_t num_rows() const = 0;
+    virtual int num_columns() const = 0;
+    virtual int num_row_groups() const = 0;
+
+    // Return const-poitner to make it clear that this object is not to be copied
+    const SchemaDescriptor* schema() const {
+       return &schema_;
+    }
+    SchemaDescriptor schema_;
+  };
+
+  ParquetFileWriter();
+  ~ParquetFileWriter();
+
+  static std::unique_ptr<ParquetFileWriter> Open(
+      std::shared_ptr<OutputStream> sink,
+      std::shared_ptr<schema::GroupNode>& schema,
+      MemoryAllocator* allocator = default_allocator());
+
+  void Open(std::unique_ptr<Contents> contents);
+  void Close();
+
+  /**
+   * Construct a RowGroupWriter for the indicated number of rows.
+   *
+   * Ownership is solely within the ParquetFileWriter. The RowGroupWriter is only valid
+   * until the next call to AppendRowGroup or Close.
+   *
+   * @param num_rows The number of rows that are stored in the new RowGroup
+   */
+  RowGroupWriter* AppendRowGroup(int64_t num_rows);
+
+  /**
+   * Number of columns.
+   *
+   * This number is fixed during the lifetime of the writer as it is determined via
+   * the schema.
+   */
+  int num_columns() const;
+
+  /**
+   * Number of rows in the yet started RowGroups.
+   *
+   * Changes on the addition of a new RowGroup.
+   */
+  int64_t num_rows() const;
+
+  /**
+   * Number of started RowGroups.
+   */
+  int num_row_groups() const;
+
+  /**
+   * Returns the file schema descriptor
+   */
+  const SchemaDescriptor* descr() {
+    return schema_;
+  }
+
+  const ColumnDescriptor* column_schema(int i) const {
+    return schema_->Column(i);
+  }
+
+ private:
+  // This is declared in the .cc file so that we can hide compiled Thrift
+  // headers from the public API and also more easily create test fixtures.
+  std::unique_ptr<Contents> contents_;
+
+  // The SchemaDescriptor is provided by the Contents impl
+  const SchemaDescriptor* schema_;
+};
+
+} // namespace parquet
+
+#endif // PARQUET_FILE_WRITER_H
+

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/schema/types.cc
----------------------------------------------------------------------
diff --git a/src/parquet/schema/types.cc b/src/parquet/schema/types.cc
index f08fb41..e5b7ed2 100644
--- a/src/parquet/schema/types.cc
+++ b/src/parquet/schema/types.cc
@@ -62,6 +62,10 @@ std::string ColumnPath::ToDotString() const {
   return ss.str();
 }
 
+const std::vector<std::string>& ColumnPath::ToDotVector() const {
+  return path_;
+}
+
 // ----------------------------------------------------------------------
 // Base node
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5fb7d20a/src/parquet/schema/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema/types.h b/src/parquet/schema/types.h
index 4131d24..452c23a 100644
--- a/src/parquet/schema/types.h
+++ b/src/parquet/schema/types.h
@@ -88,6 +88,7 @@ class ColumnPath {
 
   std::shared_ptr<ColumnPath> extend(const std::string& node_name) const;
   std::string ToDotString() const;
+  const std::vector<std::string>& ToDotVector() const;
 
  protected:
   std::vector<std::string> path_;


Mime
View raw message