Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id E3862200B94 for ; Sun, 2 Oct 2016 20:52:08 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id E2082160AD8; Sun, 2 Oct 2016 18:52:08 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CE101160AC4 for ; Sun, 2 Oct 2016 20:52:07 +0200 (CEST) Received: (qmail 70627 invoked by uid 500); 2 Oct 2016 18:52:06 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 70618 invoked by uid 99); 2 Oct 2016 18:52:06 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Sun, 02 Oct 2016 18:52:06 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id BE2D0E008F; Sun, 2 Oct 2016 18:52:06 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@parquet.apache.org Message-Id: <1effff3db3794035be24f257e2cc1c6b@git.apache.org> X-Mailer: ASF-Git Admin Mailer Subject: parquet-cpp git commit: PARQUET-731: API to return metadata size and Skip reading values Date: Sun, 2 Oct 2016 18:52:06 +0000 (UTC) archived-at: Sun, 02 Oct 2016 18:52:09 -0000 Repository: parquet-cpp Updated Branches: refs/heads/master 43d949137 -> 7abb9c476 PARQUET-731: API to return metadata size and Skip reading values Author: Deepak Majeti Closes #169 from majetideepak/PARQUET-731 and squashes the following commits: 15e539f [Deepak Majeti] use allocator and smaller memory footprint 3504edd [Deepak Majeti] clang format 25a4bc1 [Deepak Majeti] Added tests 1aeb8f5 [Deepak Majeti] API to skip values 343af37 [Deepak Majeti] Added API to get metadata size Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/7abb9c47 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/7abb9c47 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/7abb9c47 Branch: refs/heads/master Commit: 7abb9c47671d2406b38e724218810e492143cd24 Parents: 43d9491 Author: Deepak Majeti Authored: Sun Oct 2 14:51:59 2016 -0400 Committer: Wes McKinney Committed: Sun Oct 2 14:51:59 2016 -0400 ---------------------------------------------------------------------- .../arrow/arrow-reader-writer-benchmark.cc | 4 +- src/parquet/column/column-reader-test.cc | 58 ++++++++++++++++++++ src/parquet/column/reader.h | 44 ++++++++++++++- src/parquet/file/file-metadata-test.cc | 1 + src/parquet/file/metadata.cc | 12 +++- src/parquet/file/metadata.h | 1 + 6 files changed, 113 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/arrow/arrow-reader-writer-benchmark.cc ---------------------------------------------------------------------- diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc index 5f0b3f5..9ce5f96 100644 --- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc +++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc @@ -19,10 +19,10 @@ #include "parquet/arrow/reader.h" #include "parquet/arrow/writer.h" -#include "parquet/file/reader-internal.h" -#include "parquet/file/writer-internal.h" #include "parquet/column/reader.h" #include "parquet/column/writer.h" +#include "parquet/file/reader-internal.h" +#include "parquet/file/writer-internal.h" #include "parquet/util/input.h" #include "arrow/column.h" http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index ee29f4b..df45e00 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -151,6 +151,64 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { ExecuteDict(num_pages, levels_per_page, &descr); } +TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) { + int levels_per_page = 100; + int num_pages = 5; + max_def_level_ = 0; + max_rep_level_ = 0; + NodePtr type = schema::Int32("b", Repetition::REQUIRED); + const ColumnDescriptor descr(type, max_def_level_, max_rep_level_); + MakePages(&descr, num_pages, levels_per_page, def_levels_, rep_levels_, + values_, data_buffer_, pages_, Encoding::PLAIN); + InitReader(&descr); + vector vresult(levels_per_page / 2, -1); + vector dresult(levels_per_page / 2, -1); + vector rresult(levels_per_page / 2, -1); + + Int32Reader* reader = static_cast(reader_.get()); + int64_t values_read = 0; + + // 1) skip_size > page_size (multiple pages skipped) + // Skip first 2 pages + int64_t levels_skipped = reader->Skip(2 * levels_per_page); + ASSERT_EQ(2 * levels_per_page, levels_skipped); + // Read half a page + reader->ReadBatch( + levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read); + vector sub_values( + values_.begin() + 2 * levels_per_page, values_.begin() + 2.5 * levels_per_page); + ASSERT_TRUE(vector_equal(sub_values, vresult)); + + // 2) skip_size == page_size (skip across two pages) + levels_skipped = reader->Skip(levels_per_page); + ASSERT_EQ(levels_per_page, levels_skipped); + // Read half a page + reader->ReadBatch( + levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read); + sub_values.clear(); + sub_values.insert(sub_values.end(), values_.begin() + 3.5 * levels_per_page, + values_.begin() + 4 * levels_per_page); + ASSERT_TRUE(vector_equal(sub_values, vresult)); + + // 3) skip_size < page_size (skip limited to a single page) + // Skip half a page + levels_skipped = reader->Skip(levels_per_page / 2); + ASSERT_EQ(0.5 * levels_per_page, levels_skipped); + // Read half a page + reader->ReadBatch( + levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read); + sub_values.clear(); + sub_values.insert( + sub_values.end(), values_.begin() + 4.5 * levels_per_page, values_.end()); + ASSERT_TRUE(vector_equal(sub_values, vresult)); + + values_.clear(); + def_levels_.clear(); + rep_levels_.clear(); + pages_.clear(); + reader_.reset(); +} + TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) { max_def_level_ = 0; max_rep_level_ = 0; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/column/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h index 8243e85..5608192 100644 --- a/src/parquet/column/reader.h +++ b/src/parquet/column/reader.h @@ -23,6 +23,7 @@ #include #include #include +#include #include "parquet/column/levels.h" #include "parquet/column/page.h" @@ -124,8 +125,12 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader { // This API is the same for both V1 and V2 of the DataPage // // @returns: actual number of levels read (see values_read for number of values read) - int64_t ReadBatch(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels, - T* values, int64_t* values_read); + int64_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels, T* values, + int64_t* values_read); + + // Skip reading levels + // Returns the number of levels skipped + int64_t Skip(int64_t num_rows_to_skip); private: typedef Decoder DecoderType; @@ -166,7 +171,7 @@ inline int64_t TypedColumnReader::ReadBatch(int batch_size, int16_t* def_ // TODO(wesm): keep reading data pages until batch_size is reached, or the // row group is finished - batch_size = std::min(batch_size, num_buffered_values_); + batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_); int64_t num_def_levels = 0; int64_t num_rep_levels = 0; @@ -201,6 +206,39 @@ inline int64_t TypedColumnReader::ReadBatch(int batch_size, int16_t* def_ return total_values; } +template +inline int64_t TypedColumnReader::Skip(int64_t num_rows_to_skip) { + int64_t rows_to_skip = num_rows_to_skip; + while (HasNext() && rows_to_skip > 0) { + // If the number of rows to skip is more than the number of undecoded values, skip the + // Page. + if (rows_to_skip > (num_buffered_values_ - num_decoded_values_)) { + rows_to_skip -= num_buffered_values_ - num_decoded_values_; + num_decoded_values_ = num_buffered_values_; + } else { + // We need to read this Page + // Jump to the right offset in the Page + int64_t batch_size = 1024; // ReadBatch with a smaller memory footprint + int64_t values_read = 0; + auto vals = std::make_shared( + batch_size * type_traits::value_byte_size, this->allocator_); + auto def_levels = std::make_shared( + batch_size * sizeof(int16_t), this->allocator_); + auto rep_levels = std::make_shared( + batch_size * sizeof(int16_t), this->allocator_); + do { + batch_size = std::min(batch_size, rows_to_skip); + values_read = + ReadBatch(batch_size, reinterpret_cast(def_levels->mutable_data()), + reinterpret_cast(rep_levels->mutable_data()), + reinterpret_cast(vals->mutable_data()), &values_read); + rows_to_skip -= values_read; + } while (values_read > 0 && rows_to_skip > 0); + } + } + return num_rows_to_skip - rows_to_skip; +} + typedef TypedColumnReader BoolReader; typedef TypedColumnReader Int32Reader; typedef TypedColumnReader Int64Reader; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/file/file-metadata-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc index 576db43..7bb2ae5 100644 --- a/src/parquet/file/file-metadata-test.cc +++ b/src/parquet/file/file-metadata-test.cc @@ -83,6 +83,7 @@ TEST(Metadata, TestBuildAccess) { // file metadata ASSERT_EQ(nrows, f_accessor->num_rows()); + ASSERT_LE(0, f_accessor->size()); ASSERT_EQ(2, f_accessor->num_row_groups()); ASSERT_EQ(DEFAULT_WRITER_VERSION, f_accessor->version()); ASSERT_EQ(DEFAULT_CREATED_BY, f_accessor->created_by()); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/file/metadata.cc ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc index 9964882..36098c5 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/file/metadata.cc @@ -229,15 +229,18 @@ std::unique_ptr RowGroupMetaData::ColumnChunk(int i) const // file metadata class FileMetaData::FileMetaDataImpl { public: - FileMetaDataImpl() {} + FileMetaDataImpl() : metadata_len_(0) {} - explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) { + explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) + : metadata_len_(0) { metadata_.reset(new format::FileMetaData); DeserializeThriftMsg(metadata, metadata_len, metadata_.get()); + metadata_len_ = *metadata_len; InitSchema(); } ~FileMetaDataImpl() {} + inline uint32_t size() const { return metadata_len_; } inline int num_columns() const { return schema_.num_columns(); } inline int64_t num_rows() const { return metadata_->num_rows; } inline int num_row_groups() const { return metadata_->row_groups.size(); } @@ -262,6 +265,7 @@ class FileMetaData::FileMetaDataImpl { private: friend FileMetaDataBuilder; + uint32_t metadata_len_; std::unique_ptr metadata_; void InitSchema() { schema::FlatSchemaConverter converter( @@ -289,6 +293,10 @@ std::unique_ptr FileMetaData::RowGroup(int i) const { return impl_->RowGroup(i); } +uint32_t FileMetaData::size() const { + return impl_->size(); +} + int FileMetaData::num_columns() const { return impl_->num_columns(); } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/file/metadata.h ---------------------------------------------------------------------- diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h index b36ced8..5b8115b 100644 --- a/src/parquet/file/metadata.h +++ b/src/parquet/file/metadata.h @@ -106,6 +106,7 @@ class PARQUET_EXPORT FileMetaData { ~FileMetaData(); // file metadata + uint32_t size() const; int num_columns() const; int64_t num_rows() const; int num_row_groups() const;