parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-691: Write ColumnChunk metadata after chunk is complete
Date Tue, 24 Jan 2017 20:26:20 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master f9ff60797 -> 248094206


PARQUET-691: Write ColumnChunk metadata after chunk is complete

See corresponding logic in Impala's Parquet implementation: https://github.com/apache/incubator-impala/blob/master/be/src/exec/hdfs-parquet-table-writer.cc#L973

Author: Wes McKinney <wes.mckinney@twosigma.com>

Closes #224 from wesm/PARQUET-691 and squashes the following commits:

0387d43 [Wes McKinney] Write ColumnChunk metadata after chunk is complete


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/24809420
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/24809420
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/24809420

Branch: refs/heads/master
Commit: 2480942067bf078220745a733a65df55c572790a
Parents: f9ff607
Author: Wes McKinney <wes.mckinney@twosigma.com>
Authored: Tue Jan 24 15:26:14 2017 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Tue Jan 24 15:26:14 2017 -0500

----------------------------------------------------------------------
 src/parquet/file/metadata.cc        |  8 ++++++++
 src/parquet/file/metadata.h         |  3 +++
 src/parquet/file/writer-internal.cc | 12 ++++++++----
 src/parquet/file/writer-internal.h  |  3 ---
 src/parquet/thrift/util.h           |  3 ++-
 5 files changed, 21 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index a262b63..1545efe 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -501,6 +501,10 @@ class ColumnChunkMetaDataBuilder::ColumnChunkMetaDataBuilderImpl {
     column_chunk_->meta_data.__set_encodings(thrift_encodings);
   }
 
+  void WriteTo(OutputStream* sink) {
+    SerializeThriftMsg(column_chunk_, sizeof(format::ColumnChunk), sink);
+  }
+
   const ColumnDescriptor* descr() const { return column_; }
 
  private:
@@ -536,6 +540,10 @@ void ColumnChunkMetaDataBuilder::Finish(int64_t num_values,
       compressed_size, uncompressed_size, has_dictionary, dictionary_fallback);
 }
 
+void ColumnChunkMetaDataBuilder::WriteTo(OutputStream* sink) {
+  impl_->WriteTo(sink);
+}
+
 const ColumnDescriptor* ColumnChunkMetaDataBuilder::descr() const {
   return impl_->descr();
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index 942aa39..7307ddf 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -174,6 +174,9 @@ class PARQUET_EXPORT ColumnChunkMetaDataBuilder {
       int64_t index_page_offset, int64_t data_page_offset, int64_t compressed_size,
       int64_t uncompressed_size, bool has_dictionary, bool dictionary_fallback);
 
+  // For writing metadata at end of column chunk
+  void WriteTo(OutputStream* sink);
+
  private:
   explicit ColumnChunkMetaDataBuilder(const std::shared_ptr<WriterProperties>&
props,
       const ColumnDescriptor* column, uint8_t* contents);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 48884ad..724635c 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -62,6 +62,9 @@ void SerializedPageWriter::Close(bool has_dictionary, bool fallback) {
   // TODO: Remove default fallback = 'false' when implemented
   metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_,
       total_compressed_size_, total_uncompressed_size_, has_dictionary, fallback);
+
+  // Write metadata at end of column chunk
+  metadata_->WriteTo(sink_);
 }
 
 std::shared_ptr<Buffer> SerializedPageWriter::Compress(
@@ -104,8 +107,9 @@ int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage&
page) {
 
   int64_t start_pos = sink_->Tell();
   if (data_page_offset_ == 0) { data_page_offset_ = start_pos; }
-  SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
-  int64_t header_size = sink_->Tell() - start_pos;
+
+  int64_t header_size =
+      SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
   sink_->Write(compressed_data->data(), compressed_data->size());
 
   total_uncompressed_size_ += uncompressed_size + header_size;
@@ -133,8 +137,8 @@ int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage&
page) {
 
   int64_t start_pos = sink_->Tell();
   if (dictionary_page_offset_ == 0) { dictionary_page_offset_ = start_pos; }
-  SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
-  int64_t header_size = sink_->Tell() - start_pos;
+  int64_t header_size =
+      SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_);
   sink_->Write(compressed_data->data(), compressed_data->size());
 
   total_uncompressed_size_ += uncompressed_size + header_size;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 81a0837..12002e1 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -81,9 +81,6 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
   int num_columns() const override;
   int64_t num_rows() const override;
 
-  // TODO: PARQUET-579
-  // void WriteRowGroupStatitics() override;
-
   ColumnWriter* NextColumn() override;
   void Close() override;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/24809420/src/parquet/thrift/util.h
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h
index 9d2b66f..30d7edf 100644
--- a/src/parquet/thrift/util.h
+++ b/src/parquet/thrift/util.h
@@ -118,7 +118,7 @@ inline void DeserializeThriftMsg(const uint8_t* buf, uint32_t* len, T*
deseriali
 // The arguments are the object to be serialized and
 // the expected size of the serialized object
 template <class T>
-inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
+inline int64_t SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out) {
   boost::shared_ptr<apache::thrift::transport::TMemoryBuffer> mem_buffer(
       new apache::thrift::transport::TMemoryBuffer(len));
   apache::thrift::protocol::TCompactProtocolFactoryT<
@@ -139,6 +139,7 @@ inline void SerializeThriftMsg(T* obj, uint32_t len, OutputStream* out)
{
   uint32_t out_length;
   mem_buffer->getBuffer(&out_buffer, &out_length);
   out->Write(out_buffer, out_length);
+  return out_length;
 }
 
 }  // namespace parquet


Mime
View raw message