parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-731: API to return metadata size and Skip reading values
Date Sun, 02 Oct 2016 18:52:06 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 43d949137 -> 7abb9c476


PARQUET-731: API to return metadata size and Skip reading values

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #169 from majetideepak/PARQUET-731 and squashes the following commits:

15e539f [Deepak Majeti] use allocator and smaller memory footprint
3504edd [Deepak Majeti] clang format
25a4bc1 [Deepak Majeti] Added tests
1aeb8f5 [Deepak Majeti] API to skip values
343af37 [Deepak Majeti] Added API to get metadata size


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/7abb9c47
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/7abb9c47
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/7abb9c47

Branch: refs/heads/master
Commit: 7abb9c47671d2406b38e724218810e492143cd24
Parents: 43d9491
Author: Deepak Majeti <deepak.majeti@hpe.com>
Authored: Sun Oct 2 14:51:59 2016 -0400
Committer: Wes McKinney <wesm@apache.org>
Committed: Sun Oct 2 14:51:59 2016 -0400

----------------------------------------------------------------------
 .../arrow/arrow-reader-writer-benchmark.cc      |  4 +-
 src/parquet/column/column-reader-test.cc        | 58 ++++++++++++++++++++
 src/parquet/column/reader.h                     | 44 ++++++++++++++-
 src/parquet/file/file-metadata-test.cc          |  1 +
 src/parquet/file/metadata.cc                    | 12 +++-
 src/parquet/file/metadata.h                     |  1 +
 6 files changed, 113 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/arrow/arrow-reader-writer-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
index 5f0b3f5..9ce5f96 100644
--- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc
+++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc
@@ -19,10 +19,10 @@
 
 #include "parquet/arrow/reader.h"
 #include "parquet/arrow/writer.h"
-#include "parquet/file/reader-internal.h"
-#include "parquet/file/writer-internal.h"
 #include "parquet/column/reader.h"
 #include "parquet/column/writer.h"
+#include "parquet/file/reader-internal.h"
+#include "parquet/file/writer-internal.h"
 #include "parquet/util/input.h"
 
 #include "arrow/column.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index ee29f4b..df45e00 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -151,6 +151,64 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
   ExecuteDict(num_pages, levels_per_page, &descr);
 }
 
+TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
+  int levels_per_page = 100;
+  int num_pages = 5;
+  max_def_level_ = 0;
+  max_rep_level_ = 0;
+  NodePtr type = schema::Int32("b", Repetition::REQUIRED);
+  const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+  MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels_, rep_levels_,
+      values_, data_buffer_, pages_, Encoding::PLAIN);
+  InitReader(&descr);
+  vector<int32_t> vresult(levels_per_page / 2, -1);
+  vector<int16_t> dresult(levels_per_page / 2, -1);
+  vector<int16_t> rresult(levels_per_page / 2, -1);
+
+  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+  int64_t values_read = 0;
+
+  // 1) skip_size > page_size (multiple pages skipped)
+  // Skip first 2 pages
+  int64_t levels_skipped = reader->Skip(2 * levels_per_page);
+  ASSERT_EQ(2 * levels_per_page, levels_skipped);
+  // Read half a page
+  reader->ReadBatch(
+      levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+  vector<int32_t> sub_values(
+      values_.begin() + 2 * levels_per_page, values_.begin() + 2.5 * levels_per_page);
+  ASSERT_TRUE(vector_equal(sub_values, vresult));
+
+  // 2) skip_size == page_size (skip across two pages)
+  levels_skipped = reader->Skip(levels_per_page);
+  ASSERT_EQ(levels_per_page, levels_skipped);
+  // Read half a page
+  reader->ReadBatch(
+      levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+  sub_values.clear();
+  sub_values.insert(sub_values.end(), values_.begin() + 3.5 * levels_per_page,
+      values_.begin() + 4 * levels_per_page);
+  ASSERT_TRUE(vector_equal(sub_values, vresult));
+
+  // 3) skip_size < page_size (skip limited to a single page)
+  // Skip half a page
+  levels_skipped = reader->Skip(levels_per_page / 2);
+  ASSERT_EQ(0.5 * levels_per_page, levels_skipped);
+  // Read half a page
+  reader->ReadBatch(
+      levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+  sub_values.clear();
+  sub_values.insert(
+      sub_values.end(), values_.begin() + 4.5 * levels_per_page, values_.end());
+  ASSERT_TRUE(vector_equal(sub_values, vresult));
+
+  values_.clear();
+  def_levels_.clear();
+  rep_levels_.clear();
+  pages_.clear();
+  reader_.reset();
+}
+
 TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
   max_def_level_ = 0;
   max_rep_level_ = 0;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 8243e85..5608192 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -23,6 +23,7 @@
 #include <cstring>
 #include <memory>
 #include <unordered_map>
+#include <vector>
 
 #include "parquet/column/levels.h"
 #include "parquet/column/page.h"
@@ -124,8 +125,12 @@ class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
   // This API is the same for both V1 and V2 of the DataPage
   //
   // @returns: actual number of levels read (see values_read for number of values read)
-  int64_t ReadBatch(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
-      T* values, int64_t* values_read);
+  int64_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels, T* values,
+      int64_t* values_read);
+
+  // Skip reading levels
+  // Returns the number of levels skipped
+  int64_t Skip(int64_t num_rows_to_skip);
 
  private:
   typedef Decoder<DType> DecoderType;
@@ -166,7 +171,7 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size,
int16_t* def_
 
   // TODO(wesm): keep reading data pages until batch_size is reached, or the
   // row group is finished
-  batch_size = std::min(batch_size, num_buffered_values_);
+  batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
 
   int64_t num_def_levels = 0;
   int64_t num_rep_levels = 0;
@@ -201,6 +206,39 @@ inline int64_t TypedColumnReader<DType>::ReadBatch(int batch_size,
int16_t* def_
   return total_values;
 }
 
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
+  int64_t rows_to_skip = num_rows_to_skip;
+  while (HasNext() && rows_to_skip > 0) {
+    // If the number of rows to skip is more than the number of undecoded values, skip the
+    // Page.
+    if (rows_to_skip > (num_buffered_values_ - num_decoded_values_)) {
+      rows_to_skip -= num_buffered_values_ - num_decoded_values_;
+      num_decoded_values_ = num_buffered_values_;
+    } else {
+      // We need to read this Page
+      // Jump to the right offset in the Page
+      int64_t batch_size = 1024;  // ReadBatch with a smaller memory footprint
+      int64_t values_read = 0;
+      auto vals = std::make_shared<OwnedMutableBuffer>(
+          batch_size * type_traits<DType::type_num>::value_byte_size, this->allocator_);
+      auto def_levels = std::make_shared<OwnedMutableBuffer>(
+          batch_size * sizeof(int16_t), this->allocator_);
+      auto rep_levels = std::make_shared<OwnedMutableBuffer>(
+          batch_size * sizeof(int16_t), this->allocator_);
+      do {
+        batch_size = std::min(batch_size, rows_to_skip);
+        values_read =
+            ReadBatch(batch_size, reinterpret_cast<int16_t*>(def_levels->mutable_data()),
+                reinterpret_cast<int16_t*>(rep_levels->mutable_data()),
+                reinterpret_cast<T*>(vals->mutable_data()), &values_read);
+        rows_to_skip -= values_read;
+      } while (values_read > 0 && rows_to_skip > 0);
+    }
+  }
+  return num_rows_to_skip - rows_to_skip;
+}
+
 typedef TypedColumnReader<BooleanType> BoolReader;
 typedef TypedColumnReader<Int32Type> Int32Reader;
 typedef TypedColumnReader<Int64Type> Int64Reader;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/file/file-metadata-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/file/file-metadata-test.cc
index 576db43..7bb2ae5 100644
--- a/src/parquet/file/file-metadata-test.cc
+++ b/src/parquet/file/file-metadata-test.cc
@@ -83,6 +83,7 @@ TEST(Metadata, TestBuildAccess) {
 
   // file metadata
   ASSERT_EQ(nrows, f_accessor->num_rows());
+  ASSERT_LE(0, f_accessor->size());
   ASSERT_EQ(2, f_accessor->num_row_groups());
   ASSERT_EQ(DEFAULT_WRITER_VERSION, f_accessor->version());
   ASSERT_EQ(DEFAULT_CREATED_BY, f_accessor->created_by());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index 9964882..36098c5 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -229,15 +229,18 @@ std::unique_ptr<ColumnChunkMetaData> RowGroupMetaData::ColumnChunk(int
i) const
 // file metadata
 class FileMetaData::FileMetaDataImpl {
  public:
-  FileMetaDataImpl() {}
+  FileMetaDataImpl() : metadata_len_(0) {}
 
-  explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len) {
+  explicit FileMetaDataImpl(const uint8_t* metadata, uint32_t* metadata_len)
+      : metadata_len_(0) {
     metadata_.reset(new format::FileMetaData);
     DeserializeThriftMsg(metadata, metadata_len, metadata_.get());
+    metadata_len_ = *metadata_len;
     InitSchema();
   }
   ~FileMetaDataImpl() {}
 
+  inline uint32_t size() const { return metadata_len_; }
   inline int num_columns() const { return schema_.num_columns(); }
   inline int64_t num_rows() const { return metadata_->num_rows; }
   inline int num_row_groups() const { return metadata_->row_groups.size(); }
@@ -262,6 +265,7 @@ class FileMetaData::FileMetaDataImpl {
 
  private:
   friend FileMetaDataBuilder;
+  uint32_t metadata_len_;
   std::unique_ptr<format::FileMetaData> metadata_;
   void InitSchema() {
     schema::FlatSchemaConverter converter(
@@ -289,6 +293,10 @@ std::unique_ptr<RowGroupMetaData> FileMetaData::RowGroup(int i)
const {
   return impl_->RowGroup(i);
 }
 
+uint32_t FileMetaData::size() const {
+  return impl_->size();
+}
+
 int FileMetaData::num_columns() const {
   return impl_->num_columns();
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/7abb9c47/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index b36ced8..5b8115b 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -106,6 +106,7 @@ class PARQUET_EXPORT FileMetaData {
   ~FileMetaData();
 
   // file metadata
+  uint32_t size() const;
   int num_columns() const;
   int64_t num_rows() const;
   int num_row_groups() const;


Mime
View raw message