Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 4EF99200B82 for ; Fri, 16 Sep 2016 09:01:38 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id 4D75C160AC4; Fri, 16 Sep 2016 07:01:38 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 6D422160A8C for ; Fri, 16 Sep 2016 09:01:37 +0200 (CEST) Received: (qmail 19181 invoked by uid 500); 16 Sep 2016 07:01:36 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 19172 invoked by uid 99); 16 Sep 2016 07:01:36 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Fri, 16 Sep 2016 07:01:36 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id 6C49EDFE80; Fri, 16 Sep 2016 07:01:36 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: uwe@apache.org To: commits@parquet.apache.org Message-Id: <9edcbc1573e84abaa256926993736376@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: parquet-cpp git commit: PARQUET-689: C++: Compress DataPages eagerly Date: Fri, 16 Sep 2016 07:01:36 +0000 (UTC) archived-at: Fri, 16 Sep 2016 07:01:38 -0000 Repository: parquet-cpp Updated Branches: refs/heads/master 942f2aedb -> ffeb828ac PARQUET-689: C++: Compress DataPages eagerly Author: Deepak Majeti Closes #162 from majetideepak/PARQUET-689 and squashes the following commits: 46f04fb [Deepak Majeti] Clang format 73dfcf9 [Deepak Majeti] Compress Data Pages early Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/ffeb828a Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/ffeb828a Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/ffeb828a Branch: refs/heads/master Commit: ffeb828ac5bf19abe5990a2be9245a8fdd292c7a Parents: 942f2ae Author: Deepak Majeti Authored: Fri Sep 16 09:01:00 2016 +0200 Committer: Uwe L. Korn Committed: Fri Sep 16 09:01:00 2016 +0200 ---------------------------------------------------------------------- src/parquet/column/page.h | 19 ++++++++++++++++++- src/parquet/column/writer.cc | 8 +++++--- src/parquet/column/writer.h | 4 ++-- src/parquet/file/writer-internal.cc | 6 +++--- src/parquet/file/writer-internal.h | 18 +++++++++--------- 5 files changed, 37 insertions(+), 18 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h index c06d3de..1de6013 100644 --- a/src/parquet/column/page.h +++ b/src/parquet/column/page.h @@ -95,6 +95,21 @@ class DataPage : public Page { std::string min_; }; +class CompressedDataPage : public DataPage { + public: + CompressedDataPage(const std::shared_ptr& buffer, int32_t num_values, + Encoding::type encoding, Encoding::type definition_level_encoding, + Encoding::type repetition_level_encoding, int64_t uncompressed_size) + : DataPage(buffer, num_values, encoding, definition_level_encoding, + repetition_level_encoding), + uncompressed_size_(uncompressed_size) {} + + int64_t uncompressed_size() const { return uncompressed_size_; } + + private: + int64_t uncompressed_size_; +}; + class DataPageV2 : public Page { public: DataPageV2(const std::shared_ptr& buffer, int32_t num_values, int32_t num_nulls, @@ -176,9 +191,11 @@ class PageWriter { // page limit virtual void Close(bool has_dictionary, bool fallback) = 0; - virtual int64_t WriteDataPage(const DataPage& page) = 0; + virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0; virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0; + + virtual std::shared_ptr Compress(const std::shared_ptr& buffer) = 0; }; } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/writer.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc index 1fbea62..bfbd0c5 100644 --- a/src/parquet/column/writer.cc +++ b/src/parquet/column/writer.cc @@ -116,8 +116,10 @@ void ColumnWriter::AddDataPage() { memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size()); uncompressed_ptr += definition_levels->size(); memcpy(uncompressed_ptr, values->data(), values->size()); - DataPage page( - uncompressed_data, num_buffered_values_, encoding_, Encoding::RLE, Encoding::RLE); + + std::shared_ptr compressed_data = pager_->Compress(uncompressed_data); + CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, Encoding::RLE, + Encoding::RLE, uncompressed_size); // Write the page to OutputStream eagerly if there is no dictionary or // if dictionary encoding has fallen back to PLAIN @@ -133,7 +135,7 @@ void ColumnWriter::AddDataPage() { num_buffered_encoded_values_ = 0; } -void ColumnWriter::WriteDataPage(const DataPage& page) { +void ColumnWriter::WriteDataPage(const CompressedDataPage& page) { int64_t bytes_written = pager_->WriteDataPage(page); total_bytes_written_ += bytes_written; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/column/writer.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h index 4b2a021..3a54cbb 100644 --- a/src/parquet/column/writer.h +++ b/src/parquet/column/writer.h @@ -72,7 +72,7 @@ class PARQUET_EXPORT ColumnWriter { void AddDataPage(); // Serializes Data Pages - void WriteDataPage(const DataPage& page); + void WriteDataPage(const CompressedDataPage& page); // Write multiple definition levels void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels); @@ -128,7 +128,7 @@ class PARQUET_EXPORT ColumnWriter { std::unique_ptr definition_levels_sink_; std::unique_ptr repetition_levels_sink_; - std::vector data_pages_; + std::vector data_pages_; private: void InitSinks(); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/file/writer-internal.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc index 2d396b7..05aefb9 100644 --- a/src/parquet/file/writer-internal.cc +++ b/src/parquet/file/writer-internal.cc @@ -66,9 +66,9 @@ std::shared_ptr SerializedPageWriter::Compress( return compression_buffer_; } -int64_t SerializedPageWriter::WriteDataPage(const DataPage& page) { - int64_t uncompressed_size = page.size(); - std::shared_ptr compressed_data = Compress(page.buffer()); +int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) { + int64_t uncompressed_size = page.uncompressed_size(); + std::shared_ptr compressed_data = page.buffer(); format::DataPageHeader data_page_header; data_page_header.__set_num_values(page.num_values()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/ffeb828a/src/parquet/file/writer-internal.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h index e6364e9..2095154 100644 --- a/src/parquet/file/writer-internal.h +++ b/src/parquet/file/writer-internal.h @@ -40,10 +40,18 @@ class SerializedPageWriter : public PageWriter { virtual ~SerializedPageWriter() {} - int64_t WriteDataPage(const DataPage& page) override; + int64_t WriteDataPage(const CompressedDataPage& page) override; int64_t WriteDictionaryPage(const DictionaryPage& page) override; + /** + * Compress a buffer. + * + * This method may return compression_buffer_ and thus the resulting memory + * is only valid until the next call to Compress(). + */ + std::shared_ptr Compress(const std::shared_ptr& buffer) override; + void Close(bool has_dictionary, bool fallback) override; private: @@ -58,14 +66,6 @@ class SerializedPageWriter : public PageWriter { // Compression codec to use. std::unique_ptr compressor_; std::shared_ptr compression_buffer_; - - /** - * Compress a buffer. - * - * This method may return compression_buffer_ and thus the resulting memory - * is only valid until the next call to Compress(). - */ - std::shared_ptr Compress(const std::shared_ptr& buffer); }; // RowGroupWriter::Contents implementation for the Parquet file specification