parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From u..@apache.org
Subject parquet-cpp git commit: PARQUET-741: Always allocate fresh buffers while compressing
Date Fri, 07 Oct 2016 08:10:06 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master ac1c1277f -> 527d53f7c


PARQUET-741: Always allocate fresh buffers while compressing

Introduces another allocation at the cost of an actually working compression path.

Also extended the column-writer test to write several columns.

Author: Uwe L. Korn <uwelk@xhochy.com>
Author: Korn, Uwe <Uwe.Korn@blue-yonder.com>

Closes #173 from xhochy/PARQUET-741 and squashes the following commits:

ce46816 [Uwe L. Korn] Use emplace_back to get rid of the shared_ptr
0d2f041 [Uwe L. Korn] Fix signed comparison
ac1ccf0 [Uwe L. Korn] Minor style fixes
4cb03f8 [Uwe L. Korn] Fix FLBA tests
a559123 [Korn, Uwe] PARQUET-741: Always allocate fresh buffers while compressing


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/527d53f7
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/527d53f7
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/527d53f7

Branch: refs/heads/master
Commit: 527d53f7cdd192ff62260c25a513fe7f97b81e65
Parents: ac1c127
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Fri Oct 7 10:09:43 2016 +0200
Committer: Uwe L. Korn <uwelk@xhochy.com>
Committed: Fri Oct 7 10:09:43 2016 +0200

----------------------------------------------------------------------
 src/parquet/column/column-writer-test.cc | 84 +++++++++++++++++++++++----
 src/parquet/file/writer-internal.cc      | 11 ++--
 src/parquet/file/writer-internal.h       |  4 --
 3 files changed, 78 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/527d53f7/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index 29139c9..9f04c06 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -25,6 +25,7 @@
 #include "parquet/file/reader-internal.h"
 #include "parquet/file/writer-internal.h"
 #include "parquet/types.h"
+#include "parquet/util/comparison.h"
 #include "parquet/util/input.h"
 #include "parquet/util/output.h"
 
@@ -38,7 +39,7 @@ namespace test {
 // The default size used in most tests.
 const int SMALL_SIZE = 100;
 // Larger size to test some corner cases, only used in some specific cases.
-const int LARGE_SIZE = 10000;
+const int LARGE_SIZE = 100000;
 // Very large size to test dictionary fallback.
 const int VERY_LARGE_SIZE = 400000;
 
@@ -97,26 +98,37 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType>
{
     this->SyncValuesOut();
   }
 
+  void ReadColumnFully(Compression::type compression = Compression::UNCOMPRESSED);
+
   void TestRequiredWithEncoding(Encoding::type encoding) {
     return TestRequiredWithSettings(encoding, Compression::UNCOMPRESSED, false, false);
   }
 
   void TestRequiredWithSettings(Encoding::type encoding, Compression::type compression,
-      bool enable_dictionary, bool enable_statistics) {
-    this->GenerateData(SMALL_SIZE);
+      bool enable_dictionary, bool enable_statistics, int64_t num_rows = SMALL_SIZE) {
+    this->GenerateData(num_rows);
 
     // Test case 1: required and non-repeated, so no definition or repetition levels
     ColumnProperties column_properties(
         encoding, compression, enable_dictionary, enable_statistics);
     std::shared_ptr<TypedColumnWriter<TestType>> writer =
-        this->BuildWriter(SMALL_SIZE, column_properties);
+        this->BuildWriter(num_rows, column_properties);
     writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
     // The behaviour should be independent from the number of Close() calls
     writer->Close();
     writer->Close();
 
-    this->ReadColumn(compression);
-    ASSERT_EQ(SMALL_SIZE, this->values_read_);
+    this->SetupValuesOut(num_rows);
+    this->ReadColumnFully(compression);
+    Compare<T> compare(this->descr_);
+    for (size_t i = 0; i < this->values_.size(); i++) {
+      if (compare(this->values_[i], this->values_out_[i]) ||
+          compare(this->values_out_[i], this->values_[i])) {
+        std::cout << "Failed at " << i << std::endl;
+      }
+      ASSERT_FALSE(compare(this->values_[i], this->values_out_[i]));
+      ASSERT_FALSE(compare(this->values_out_[i], this->values_[i]));
+    }
     ASSERT_EQ(this->values_, this->values_out_);
   }
 
@@ -154,8 +166,53 @@ class TestPrimitiveWriter : public PrimitiveTypedTest<TestType>
{
   std::unique_ptr<ColumnChunkMetaDataBuilder> metadata_;
   std::unique_ptr<InMemoryOutputStream> sink_;
   std::shared_ptr<WriterProperties> writer_properties_;
+  std::vector<std::vector<uint8_t>> data_buffer_;
 };
 
+template <typename TestType>
+void TestPrimitiveWriter<TestType>::ReadColumnFully(Compression::type compression)
{
+  BuildReader(compression);
+  values_read_ = 0;
+  while (values_read_ < static_cast<int64_t>(this->values_out_.size())) {
+    int64_t values_read_recently = 0;
+    reader_->ReadBatch(this->values_out_.size() - values_read_,
+        definition_levels_out_.data() + values_read_,
+        repetition_levels_out_.data() + values_read_,
+        this->values_out_ptr_ + values_read_, &values_read_recently);
+    values_read_ += values_read_recently;
+  }
+  this->SyncValuesOut();
+}
+
+template <>
+void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type compression)
{
+  BuildReader(compression);
+  this->data_buffer_.clear();
+
+  values_read_ = 0;
+  while (values_read_ < static_cast<int64_t>(this->values_out_.size())) {
+    int64_t values_read_recently = 0;
+    reader_->ReadBatch(this->values_out_.size() - values_read_,
+        definition_levels_out_.data() + values_read_,
+        repetition_levels_out_.data() + values_read_,
+        this->values_out_ptr_ + values_read_, &values_read_recently);
+
+    // Copy contents of the pointers
+    std::vector<uint8_t> data(values_read_recently * this->descr_->type_length());
+    uint8_t* data_ptr = data.data();
+    for (int64_t i = 0; i < values_read_recently; i++) {
+      memcpy(data_ptr + this->descr_->type_length() * i,
+          this->values_out_[i + values_read_].ptr, this->descr_->type_length());
+      this->values_out_[i + values_read_].ptr =
+          data_ptr + this->descr_->type_length() * i;
+    }
+    data_buffer_.emplace_back(std::move(data));
+
+    values_read_ += values_read_recently;
+  }
+  this->SyncValuesOut();
+}
+
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
     BooleanType, ByteArrayType, FLBAType> TestTypes;
 
@@ -198,23 +255,28 @@ TYPED_TEST(TestPrimitiveWriter, RequiredRLEDictionary) {
 */
 
 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithSnappyCompression) {
-  this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, false);
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::SNAPPY, false, false, LARGE_SIZE);
 }
 
 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithGzipCompression) {
-  this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, false);
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::GZIP, false, false, LARGE_SIZE);
 }
 
 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStats) {
-  this->TestRequiredWithSettings(Encoding::PLAIN, Compression::UNCOMPRESSED, false, true);
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::UNCOMPRESSED, false, true, LARGE_SIZE);
 }
 
 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndSnappyCompression) {
-  this->TestRequiredWithSettings(Encoding::PLAIN, Compression::SNAPPY, false, true);
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::SNAPPY, false, true, LARGE_SIZE);
 }
 
 TYPED_TEST(TestPrimitiveWriter, RequiredPlainWithStatsAndGzipCompression) {
-  this->TestRequiredWithSettings(Encoding::PLAIN, Compression::GZIP, false, true);
+  this->TestRequiredWithSettings(
+      Encoding::PLAIN, Compression::GZIP, false, true, LARGE_SIZE);
 }
 
 TYPED_TEST(TestPrimitiveWriter, Optional) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/527d53f7/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index 5a7c70e..c4681bd 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -41,8 +41,7 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type
       dictionary_page_offset_(0),
       data_page_offset_(0),
       total_uncompressed_size_(0),
-      total_compressed_size_(0),
-      compression_buffer_(std::make_shared<OwnedMutableBuffer>(0, allocator)) {
+      total_compressed_size_(0) {
   compressor_ = Codec::Create(codec);
 }
 
@@ -72,11 +71,11 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress(
   // Compress the data
   int64_t max_compressed_size =
       compressor_->MaxCompressedLen(buffer->size(), buffer->data());
-  compression_buffer_->Resize(max_compressed_size);
+  auto compression_buffer = std::make_shared<OwnedMutableBuffer>(max_compressed_size);
   int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(),
-      max_compressed_size, compression_buffer_->mutable_data());
-  compression_buffer_->Resize(compressed_size);
-  return compression_buffer_;
+      max_compressed_size, compression_buffer->mutable_data());
+  compression_buffer->Resize(compressed_size);
+  return compression_buffer;
 }
 
 int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/527d53f7/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 2095154..f1f76ab 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -46,9 +46,6 @@ class SerializedPageWriter : public PageWriter {
 
   /**
    * Compress a buffer.
-   *
-   * This method may return compression_buffer_ and thus the resulting memory
-   * is only valid until the next call to Compress().
    */
   std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer)
override;
 
@@ -65,7 +62,6 @@ class SerializedPageWriter : public PageWriter {
 
   // Compression codec to use.
   std::unique_ptr<Codec> compressor_;
-  std::shared_ptr<OwnedMutableBuffer> compression_buffer_;
 };
 
 // RowGroupWriter::Contents implementation for the Parquet file specification


Mime
View raw message