parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-909: Reduce buffer allocations (mallocs) on critical path
Date Fri, 17 Mar 2017 22:08:05 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master a6520d34f -> d5a1aed72


PARQUET-909: Reduce buffer allocations (mallocs) on critical path

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #268 from majetideepak/ReuseBuffers and squashes the following commits:

bbf5453 [Deepak Majeti] Review comments
4d93d4b [Deepak Majeti] Improve example
3a3e2bb [Deepak Majeti] Fix Resize shrink_to_fit
53c8ac1 [Deepak Majeti] Improve API
22f422f [Deepak Majeti] clang format
8ae02cc [Deepak Majeti] optimize for uncompressed data
3190cef [Deepak Majeti] change fit_to_size of InMemoryOutputStream
03d4862 [Deepak Majeti] clang format
261aa1c [Deepak Majeti] Rewrite Compress API
b09c4a8 [Deepak Majeti] Reuse uncompressed_data buffer
6b9a81b [Deepak Majeti] Clang fromat
31af602 [Deepak Majeti] Reuse levels rle buffer
82eabfb [Deepak Majeti] Re-use def and rep levels sink
edc8e3c [Deepak Majeti] Add API to InMemoryOutputStream


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/d5a1aed7
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/d5a1aed7
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/d5a1aed7

Branch: refs/heads/master
Commit: d5a1aed72c18ba6047ca469fb46661ec7a44136b
Parents: a6520d3
Author: Deepak Majeti <deepak.majeti@hpe.com>
Authored: Fri Mar 17 18:07:59 2017 -0400
Committer: Wes McKinney <wes.mckinney@twosigma.com>
Committed: Fri Mar 17 18:07:59 2017 -0400

----------------------------------------------------------------------
 examples/reader-writer.cc           |  8 ++-
 src/parquet/column/page.h           |  4 +-
 src/parquet/column/statistics.h     |  4 +-
 src/parquet/column/writer.cc        | 90 +++++++++++++++++++++-----------
 src/parquet/column/writer.h         | 11 +++-
 src/parquet/encoding-internal.h     |  8 +--
 src/parquet/file/reader-internal.cc |  2 +-
 src/parquet/file/writer-internal.cc | 31 ++++++-----
 src/parquet/file/writer-internal.h  |  4 +-
 src/parquet/util/memory.h           |  6 +++
 10 files changed, 114 insertions(+), 54 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/examples/reader-writer.cc
----------------------------------------------------------------------
diff --git a/examples/reader-writer.cc b/examples/reader-writer.cc
index 59ee63b..54390e0 100644
--- a/examples/reader-writer.cc
+++ b/examples/reader-writer.cc
@@ -110,9 +110,14 @@ int main(int argc, char** argv) {
     // Setup the parquet schema
     std::shared_ptr<GroupNode> schema = SetupSchema();
 
+    // Add writer properties
+    parquet::WriterProperties::Builder builder;
+    builder.compression(parquet::Compression::SNAPPY);
+    std::shared_ptr<parquet::WriterProperties> props = builder.build();
+
     // Create a ParquetFileWriter instance
     std::shared_ptr<parquet::ParquetFileWriter> file_writer =
-        parquet::ParquetFileWriter::Open(out_file, schema);
+        parquet::ParquetFileWriter::Open(out_file, schema, props);
 
     // Append a RowGroup with a specific number of rows.
     parquet::RowGroupWriter* rg_writer =
@@ -225,6 +230,7 @@ int main(int argc, char** argv) {
     // Create a ParquetReader instance
     std::unique_ptr<parquet::ParquetFileReader> parquet_reader =
         parquet::ParquetFileReader::OpenFile(PARQUET_FILENAME, false);
+
     // Get the File MetaData
     std::shared_ptr<parquet::FileMetaData> file_metadata = parquet_reader->metadata();
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index 6670e7f..bca0ca4 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -191,7 +191,9 @@ class PageWriter {
 
   virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
 
-  virtual std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>&
buffer) = 0;
+  virtual bool has_compressor() = 0;
+
+  virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
 };
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/column/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h
index 509e587..6f12eb9 100644
--- a/src/parquet/column/statistics.h
+++ b/src/parquet/column/statistics.h
@@ -182,7 +182,7 @@ inline void TypedRowGroupStatistics<FLBAType>::Copy(
     const FLBA& src, FLBA* dst, PoolBuffer* buffer) {
   if (dst->ptr == src.ptr) return;
   uint32_t len = descr_->type_length();
-  PARQUET_THROW_NOT_OK(buffer->Resize(len));
+  PARQUET_THROW_NOT_OK(buffer->Resize(len, false));
   std::memcpy(buffer->mutable_data(), src.ptr, len);
   *dst = FLBA(buffer->data());
 }
@@ -191,7 +191,7 @@ template <>
 inline void TypedRowGroupStatistics<ByteArrayType>::Copy(
     const ByteArray& src, ByteArray* dst, PoolBuffer* buffer) {
   if (dst->ptr == src.ptr) return;
-  PARQUET_THROW_NOT_OK(buffer->Resize(src.len));
+  PARQUET_THROW_NOT_OK(buffer->Resize(src.len, false));
   std::memcpy(buffer->mutable_data(), src.ptr, src.len);
   *dst = ByteArray(src.len, buffer->data());
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index fc0372f..2ba4162 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -52,12 +52,23 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
       total_bytes_written_(0),
       closed_(false),
       fallback_(false) {
-  InitSinks();
+  definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+  repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+  definition_levels_rle_ =
+      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+  repetition_levels_rle_ =
+      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+  uncompressed_data_ =
+      std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+  if (pager_->has_compressor()) {
+    compressed_data_ =
+        std::static_pointer_cast<ResizableBuffer>(AllocateBuffer(allocator_, 0));
+  }
 }
 
 void ColumnWriter::InitSinks() {
-  definition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
-  repetition_levels_sink_.reset(new InMemoryOutputStream(allocator_));
+  definition_levels_sink_->Clear();
+  repetition_levels_sink_->Clear();
 }
 
 void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
@@ -72,68 +83,87 @@ void ColumnWriter::WriteRepetitionLevels(int64_t num_levels, const int16_t*
leve
       reinterpret_cast<const uint8_t*>(levels), sizeof(int16_t) * num_levels);
 }
 
-std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels(
-    const std::shared_ptr<Buffer>& buffer, int16_t max_level) {
+// return the size of the encoded buffer
+int64_t ColumnWriter::RleEncodeLevels(
+    const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level) {
   // TODO: This only works with due to some RLE specifics
   int64_t rle_size =
       LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, num_buffered_values_) +
       sizeof(int32_t);
-  std::shared_ptr<PoolBuffer> buffer_rle = AllocateBuffer(allocator_, rle_size);
+
+  // Use Arrow::Buffer::shrink_to_fit = false
+  // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
+  PARQUET_THROW_NOT_OK(dest_buffer->Resize(rle_size, false));
+
   level_encoder_.Init(Encoding::RLE, max_level, num_buffered_values_,
-      buffer_rle->mutable_data() + sizeof(int32_t), buffer_rle->size() - sizeof(int32_t));
+      dest_buffer->mutable_data() + sizeof(int32_t),
+      dest_buffer->size() - sizeof(int32_t));
   int encoded = level_encoder_.Encode(
-      num_buffered_values_, reinterpret_cast<const int16_t*>(buffer->data()));
+      num_buffered_values_, reinterpret_cast<const int16_t*>(src_buffer.data()));
   DCHECK_EQ(encoded, num_buffered_values_);
-  reinterpret_cast<int32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len();
+  reinterpret_cast<int32_t*>(dest_buffer->mutable_data())[0] = level_encoder_.len();
   int64_t encoded_size = level_encoder_.len() + sizeof(int32_t);
-  DCHECK(rle_size >= encoded_size);
-  PARQUET_THROW_NOT_OK(buffer_rle->Resize(encoded_size));
-  return std::static_pointer_cast<Buffer>(buffer_rle);
+  return encoded_size;
 }
 
 void ColumnWriter::AddDataPage() {
-  std::shared_ptr<Buffer> definition_levels = definition_levels_sink_->GetBuffer();
-  std::shared_ptr<Buffer> repetition_levels = repetition_levels_sink_->GetBuffer();
+  int64_t definition_levels_rle_size = 0;
+  int64_t repetition_levels_rle_size = 0;
+
   std::shared_ptr<Buffer> values = GetValuesBuffer();
 
   if (descr_->max_definition_level() > 0) {
-    definition_levels =
-        RleEncodeLevels(definition_levels, descr_->max_definition_level());
+    definition_levels_rle_size = RleEncodeLevels(definition_levels_sink_->GetBufferRef(),
+        definition_levels_rle_.get(), descr_->max_definition_level());
   }
 
   if (descr_->max_repetition_level() > 0) {
-    repetition_levels =
-        RleEncodeLevels(repetition_levels, descr_->max_repetition_level());
+    repetition_levels_rle_size = RleEncodeLevels(repetition_levels_sink_->GetBufferRef(),
+        repetition_levels_rle_.get(), descr_->max_repetition_level());
   }
 
   int64_t uncompressed_size =
-      definition_levels->size() + repetition_levels->size() + values->size();
+      definition_levels_rle_size + repetition_levels_rle_size + values->size();
+
+  // Use Arrow::Buffer::shrink_to_fit = false
+  // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
+  PARQUET_THROW_NOT_OK(uncompressed_data_->Resize(uncompressed_size, false));
 
   // Concatenate data into a single buffer
-  std::shared_ptr<PoolBuffer> uncompressed_data =
-      AllocateBuffer(allocator_, uncompressed_size);
-  uint8_t* uncompressed_ptr = uncompressed_data->mutable_data();
-  memcpy(uncompressed_ptr, repetition_levels->data(), repetition_levels->size());
-  uncompressed_ptr += repetition_levels->size();
-  memcpy(uncompressed_ptr, definition_levels->data(), definition_levels->size());
-  uncompressed_ptr += definition_levels->size();
+  uint8_t* uncompressed_ptr = uncompressed_data_->mutable_data();
+  memcpy(uncompressed_ptr, repetition_levels_rle_->data(), repetition_levels_rle_size);
+  uncompressed_ptr += repetition_levels_rle_size;
+  memcpy(uncompressed_ptr, definition_levels_rle_->data(), definition_levels_rle_size);
+  uncompressed_ptr += definition_levels_rle_size;
   memcpy(uncompressed_ptr, values->data(), values->size());
 
   EncodedStatistics page_stats = GetPageStatistics();
   ResetPageStatistics();
-  std::shared_ptr<Buffer> compressed_data = pager_->Compress(uncompressed_data);
-  CompressedDataPage page(compressed_data, num_buffered_values_, encoding_, Encoding::RLE,
-      Encoding::RLE, uncompressed_size, page_stats);
+
+  std::shared_ptr<Buffer> compressed_data;
+  if (pager_->has_compressor()) {
+    pager_->Compress(*(uncompressed_data_.get()), compressed_data_.get());
+    compressed_data = compressed_data_;
+  } else {
+    compressed_data = uncompressed_data_;
+  }
 
   // Write the page to OutputStream eagerly if there is no dictionary or
   // if dictionary encoding has fallen back to PLAIN
   if (has_dictionary_ && !fallback_) {  // Save pages until end of dictionary encoding
+    std::shared_ptr<Buffer> compressed_data_copy;
+    PARQUET_THROW_NOT_OK(compressed_data->Copy(
+        0, compressed_data->size(), allocator_, &compressed_data_copy));
+    CompressedDataPage page(compressed_data_copy, num_buffered_values_, encoding_,
+        Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
     data_pages_.push_back(std::move(page));
   } else {  // Eagerly write pages
+    CompressedDataPage page(compressed_data, num_buffered_values_, encoding_,
+        Encoding::RLE, Encoding::RLE, uncompressed_size, page_stats);
     WriteDataPage(page);
   }
 
-  // Re-initialize the sinks as GetBuffer made them invalid.
+  // Re-initialize the sinks for next Page.
   InitSinks();
   num_buffered_values_ = 0;
   num_buffered_encoded_values_ = 0;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index c91f261..305c35e 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -89,8 +89,9 @@ class PARQUET_EXPORT ColumnWriter {
   // Write multiple repetition levels
   void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels);
 
-  std::shared_ptr<Buffer> RleEncodeLevels(
-      const std::shared_ptr<Buffer>& buffer, int16_t max_level);
+  // RLE encode the src_buffer into dest_buffer and return the encoded size
+  int64_t RleEncodeLevels(
+      const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level);
 
   // Serialize the buffered Data Pages
   void FlushBufferedDataPages();
@@ -138,6 +139,12 @@ class PARQUET_EXPORT ColumnWriter {
   std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
   std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
 
+  std::shared_ptr<ResizableBuffer> definition_levels_rle_;
+  std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
+
+  std::shared_ptr<ResizableBuffer> uncompressed_data_;
+  std::shared_ptr<ResizableBuffer> compressed_data_;
+
   std::vector<CompressedDataPage> data_pages_;
 
  private:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/encoding-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/encoding-internal.h b/src/parquet/encoding-internal.h
index ffcb531..dc2c336 100644
--- a/src/parquet/encoding-internal.h
+++ b/src/parquet/encoding-internal.h
@@ -379,7 +379,7 @@ inline void DictionaryDecoder<ByteArrayType>::SetDict(
   for (int i = 0; i < num_dictionary_values; ++i) {
     total_size += dictionary_[i].len;
   }
-  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size));
+  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false));
   int offset = 0;
 
   uint8_t* bytes_data = byte_array_data_->mutable_data();
@@ -399,7 +399,7 @@ inline void DictionaryDecoder<FLBAType>::SetDict(Decoder<FLBAType>*
dictionary)
   int fixed_len = descr_->type_length();
   int total_size = num_dictionary_values * fixed_len;
 
-  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size));
+  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size, false));
   uint8_t* bytes_data = byte_array_data_->mutable_data();
   int offset = 0;
   for (int i = 0; i < num_dictionary_values; ++i) {
@@ -495,7 +495,7 @@ class DictEncoder : public Encoder<DType> {
         AllocateBuffer(this->allocator_, EstimatedDataEncodedSize());
     int result_size = WriteIndices(buffer->mutable_data(), EstimatedDataEncodedSize());
     ClearIndices();
-    PARQUET_THROW_NOT_OK(buffer->Resize(result_size));
+    PARQUET_THROW_NOT_OK(buffer->Resize(result_size, false));
     return buffer;
   };
 
@@ -771,7 +771,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
     if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
     if (!decoder_.GetVlqInt(&values_current_block_)) { ParquetException::EofException();
}
     if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
-    PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_));
+    PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_, false));
 
     uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index bd3fbea..c05bb12 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -97,7 +97,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
     if (decompressor_ != NULL) {
       // Grow the uncompressed buffer if we need to.
       if (uncompressed_len > static_cast<int>(decompression_buffer_->size()))
{
-        PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len));
+        PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false));
       }
       decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
           decompression_buffer_->mutable_data());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index ea8a338..ff6de48 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -70,22 +70,21 @@ void SerializedPageWriter::Close(bool has_dictionary, bool fallback) {
   metadata_->WriteTo(sink_);
 }
 
-std::shared_ptr<Buffer> SerializedPageWriter::Compress(
-    const std::shared_ptr<Buffer>& buffer) {
-  // Fast path, no compressor available.
-  if (!compressor_) return buffer;
+void SerializedPageWriter::Compress(
+    const Buffer& src_buffer, ResizableBuffer* dest_buffer) {
+  DCHECK(compressor_ != nullptr);
 
   // Compress the data
   int64_t max_compressed_size =
-      compressor_->MaxCompressedLen(buffer->size(), buffer->data());
+      compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data());
 
-  std::shared_ptr<PoolBuffer> compression_buffer =
-      AllocateBuffer(pool_, max_compressed_size);
+  // Use Arrow::Buffer::shrink_to_fit = false
+  // underlying buffer only keeps growing. Resize to a smaller size does not reallocate.
+  PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false));
 
-  int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(),
-      max_compressed_size, compression_buffer->mutable_data());
-  PARQUET_THROW_NOT_OK(compression_buffer->Resize(compressed_size));
-  return compression_buffer;
+  int64_t compressed_size = compressor_->Compress(src_buffer.size(), src_buffer.data(),
+      max_compressed_size, dest_buffer->mutable_data());
+  PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false));
 }
 
 int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) {
@@ -124,7 +123,15 @@ int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage&
page) {
 
 int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) {
   int64_t uncompressed_size = page.size();
-  std::shared_ptr<Buffer> compressed_data = Compress(page.buffer());
+  std::shared_ptr<Buffer> compressed_data = nullptr;
+  if (has_compressor()) {
+    auto buffer = std::static_pointer_cast<ResizableBuffer>(
+        AllocateBuffer(pool_, uncompressed_size));
+    Compress(*(page.buffer().get()), buffer.get());
+    compressed_data = std::static_pointer_cast<Buffer>(buffer);
+  } else {
+    compressed_data = page.buffer();
+  }
 
   format::DictionaryPageHeader dict_page_header;
   dict_page_header.__set_num_values(page.num_values());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index 5bd00be..e038319 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -48,7 +48,9 @@ class SerializedPageWriter : public PageWriter {
   /**
    * Compress a buffer.
    */
-  std::shared_ptr<Buffer> Compress(const std::shared_ptr<Buffer>& buffer)
override;
+  void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override;
+
+  bool has_compressor() override { return (compressor_ != nullptr); }
 
   void Close(bool has_dictionary, bool fallback) override;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d5a1aed7/src/parquet/util/memory.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
index e2e522d..4a15430 100644
--- a/src/parquet/util/memory.h
+++ b/src/parquet/util/memory.h
@@ -340,6 +340,12 @@ class PARQUET_EXPORT InMemoryOutputStream : public OutputStream {
 
   virtual void Write(const uint8_t* data, int64_t length);
 
+  // Clears the stream
+  void Clear() { size_ = 0; }
+
+  // Get pointer to the underlying buffer
+  const Buffer& GetBufferRef() const { return *buffer_; }
+
   // Return complete stream as Buffer
   std::shared_ptr<Buffer> GetBuffer();
 


Mime
View raw message