parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-764: Support batches for PLAIN boolean writes that aren't a multiple of 8
Date Sun, 06 Nov 2016 19:18:19 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master a278998a8 -> 86ebc2393


PARQUET-764: Support batches for PLAIN boolean writes that aren't a multiple of 8

cc @majetideepak

Author: Uwe L. Korn <uwelk@xhochy.com>

Closes #185 from xhochy/PARQUET-764 and squashes the following commits:

926e61f [Uwe L. Korn] Get rid of some re-allocations
e12dc4e [Uwe L. Korn] Fix multiline comment
2ef2da5 [Uwe L. Korn] PARQUET-764: Support batches for PLAIN boolean writes that aren't a
multiple of 8


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/86ebc239
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/86ebc239
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/86ebc239

Branch: refs/heads/master
Commit: 86ebc239393e78c9888856831a3dc4504a0f6f40
Parents: a278998
Author: Uwe L. Korn <uwelk@xhochy.com>
Authored: Sun Nov 6 14:18:11 2016 -0500
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Sun Nov 6 14:18:11 2016 -0500

----------------------------------------------------------------------
 src/parquet/column/column-writer-test.cc | 20 +++++-
 src/parquet/encodings/plain-encoding.h   | 97 ++++++++++++++++-----------
 2 files changed, 75 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/86ebc239/src/parquet/column/column-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-writer-test.cc b/src/parquet/column/column-writer-test.cc
index 2269e8f..0a20ac1 100644
--- a/src/parquet/column/column-writer-test.cc
+++ b/src/parquet/column/column-writer-test.cc
@@ -214,8 +214,7 @@ void TestPrimitiveWriter<FLBAType>::ReadColumnFully(Compression::type
compressio
 }
 
 typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
-    BooleanType, ByteArrayType, FLBAType>
-    TestTypes;
+    BooleanType, ByteArrayType, FLBAType> TestTypes;
 
 TYPED_TEST_CASE(TestPrimitiveWriter, TestTypes);
 
@@ -421,5 +420,22 @@ TEST_F(TestNullValuesWriter, OptionalNullValueChunk) {
   ASSERT_EQ(0, this->values_read_);
 }
 
+// PARQUET-764
+// Correct bitpacking for boolean write at non-byte boundaries
+using TestBooleanValuesWriter = TestPrimitiveWriter<BooleanType>;
+TEST_F(TestBooleanValuesWriter, AlternateBooleanValues) {
+  this->SetUpSchema(Repetition::REQUIRED);
+  auto writer = this->BuildWriter();
+  for (int i = 0; i < SMALL_SIZE; i++) {
+    bool value = (i % 2 == 0) ? true : false;
+    writer->WriteBatch(1, nullptr, nullptr, &value);
+  }
+  writer->Close();
+  this->ReadColumn();
+  for (int i = 0; i < SMALL_SIZE; i++) {
+    ASSERT_EQ((i % 2 == 0) ? true : false, this->values_out_[i]) << i;
+  }
+}
+
 }  // namespace test
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/86ebc239/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index eee3f65..b960bd2 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -181,59 +181,76 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType>
{
   explicit PlainEncoder(
       const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
       : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator),
-        values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {}
+        bits_available_(IN_MEMORY_DEFAULT_CAPACITY * 8),
+        bits_buffer_(IN_MEMORY_DEFAULT_CAPACITY, allocator),
+        values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {
+    bit_writer_.reset(new BitWriter(bits_buffer_.mutable_data(), bits_buffer_.size()));
+  }
 
-  int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); }
+  int64_t EstimatedDataEncodedSize() override {
+    return values_sink_->Tell() + bit_writer_->bytes_written();
+  }
 
   std::shared_ptr<Buffer> FlushValues() override {
+    if (bits_available_ > 0) {
+      bit_writer_->Flush();
+      values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written());
+      bits_available_ = 0;
+      bit_writer_->Clear();
+      bits_available_ = bits_buffer_.size() * 8;
+    }
+
     std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
     values_sink_.reset(
         new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_));
     return buffer;
   }
 
-  void Put(const bool* src, int num_values) override {
-    Encode(src, num_values, values_sink_.get());
-  }
-
-  void Put(const std::vector<bool>& src, int num_values) {
-    Encode(src, num_values, values_sink_.get());
-  }
-
-  void Encode(const bool* src, int num_values, OutputStream* dst) {
-    int bytes_required = BitUtil::Ceil(num_values, 8);
-    OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);
-
-    BitWriter bit_writer(&tmp_buffer[0], bytes_required);
-    for (int i = 0; i < num_values; ++i) {
-      bit_writer.PutValue(src[i], 1);
-    }
-    bit_writer.Flush();
-
-    // Write the result to the output stream
-    dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
+#define PLAINDECODER_BOOLEAN_PUT(input_type, function_attributes)                 \
+  void Put(input_type src, int num_values) function_attributes {                  \
+    int bit_offset = 0;                                                           \
+    if (bits_available_ > 0) {                                                    \
+      int bits_to_write = std::min(bits_available_, num_values);                  \
+      for (int i = 0; i < bits_to_write; i++) {                                   \
+        bit_writer_->PutValue(src[i], 1);                                         \
+      }                                                                           \
+      bits_available_ -= bits_to_write;                                           \
+      bit_offset = bits_to_write;                                                 \
+                                                                                  \
+      if (bits_available_ == 0) {                                                 \
+        bit_writer_->Flush();                                                     \
+        values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written());
\
+        bit_writer_->Clear();                                                     \
+      }                                                                           \
+    }                                                                             \
+                                                                                  \
+    int bits_remaining = num_values - bit_offset;                                 \
+    while (bit_offset < num_values) {                                             \
+      bits_available_ = bits_buffer_.size() * 8;                                  \
+                                                                                  \
+      int bits_to_write = std::min(bits_available_, bits_remaining);              \
+      for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {             \
+        bit_writer_->PutValue(src[i], 1);                                         \
+      }                                                                           \
+      bit_offset += bits_to_write;                                                \
+      bits_available_ -= bits_to_write;                                           \
+      bits_remaining -= bits_to_write;                                            \
+                                                                                  \
+      if (bits_available_ == 0) {                                                 \
+        bit_writer_->Flush();                                                     \
+        values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written());
\
+        bit_writer_->Clear();                                                     \
+      }                                                                           \
+    }                                                                             \
   }
 
-  void Encode(const std::vector<bool>& src, int num_values, OutputStream* dst)
{
-    int bytes_required = BitUtil::Ceil(num_values, 8);
-
-    // TODO(wesm)
-    // Use a temporary buffer for now and copy, because the BitWriter is not
-    // aware of OutputStream. Later we can add some kind of Request/Flush API
-    // to OutputStream
-    OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);
-
-    BitWriter bit_writer(&tmp_buffer[0], bytes_required);
-    for (int i = 0; i < num_values; ++i) {
-      bit_writer.PutValue(src[i], 1);
-    }
-    bit_writer.Flush();
-
-    // Write the result to the output stream
-    dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
-  }
+  PLAINDECODER_BOOLEAN_PUT(const bool*, override)
+  PLAINDECODER_BOOLEAN_PUT(const std::vector<bool>&, )
 
  protected:
+  int bits_available_;
+  std::unique_ptr<BitWriter> bit_writer_;
+  OwnedMutableBuffer bits_buffer_;
   std::shared_ptr<InMemoryOutputStream> values_sink_;
 };
 


Mime
View raw message