parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject parquet-cpp git commit: PARQUET-533: Add a Buffer abstraction, refactor input/output classes to be simpler using Buffers
Date Sun, 21 Feb 2016 03:26:34 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 891985439 -> b259188dc


PARQUET-533: Add a Buffer abstraction, refactor input/output classes to be simpler using Buffers

I have also removed all RowGroupReader and ColumnReader caching until we have an idea of how
the caching will help users while also keeping memory-use under control.

The goal with this patch is to encapsulate the "data pointer and size" concept and lightly
abstract away buffer ownership. The particular motivation is being able to deal with both
normal files (bytes arriving via `fread`) and memory-mapped files (copying of bytes into new
memory not required). It also helps do away with a bunch of functions that write output into
a `std::vector<uint8_t>` now in favor of the `OwnedMutableBuffer`. Feedback welcome;
plenty more work that can be done here.

Requires PARQUET-457. Will rebase once that's merged.

Author: Wes McKinney <wesm@apache.org>

Closes #59 from wesm/PARQUET-533 and squashes the following commits:

a39a4bc [Wes McKinney] Add Buffer abstraction, refactor input/output interface classes to
use it where relevant. Output Buffer from InMemoryOutputStream. Stop caching ColumnReader
objects


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/b259188d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/b259188d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/b259188d

Branch: refs/heads/master
Commit: b259188dcb11e915b2d2247a45d0cc0823a13d36
Parents: 8919854
Author: Wes McKinney <wesm@apache.org>
Authored: Sat Feb 20 19:26:10 2016 -0800
Committer: Julien Le Dem <julien@dremio.com>
Committed: Sat Feb 20 19:26:10 2016 -0800

----------------------------------------------------------------------
 src/parquet/column/column-reader-test.cc     |  12 +-
 src/parquet/column/page.h                    |  24 ++--
 src/parquet/column/test-util.h               |   9 +-
 src/parquet/encodings/plain-encoding-test.cc |  17 +--
 src/parquet/file/file-deserialize-test.cc    |  10 +-
 src/parquet/file/reader-internal.cc          |  32 ++---
 src/parquet/file/reader-internal.h           |   4 +-
 src/parquet/file/reader.cc                   |  25 +---
 src/parquet/file/reader.h                    |   9 +-
 src/parquet/reader-test.cc                   |   6 +-
 src/parquet/util/CMakeLists.txt              |   2 +
 src/parquet/util/buffer-test.cc              |  57 +++++++++
 src/parquet/util/buffer.cc                   |  51 ++++++++
 src/parquet/util/buffer.h                    | 140 ++++++++++++++++++++++
 src/parquet/util/input.cc                    |  60 +++++-----
 src/parquet/util/input.h                     |  47 ++++----
 src/parquet/util/output-test.cc              |   8 +-
 src/parquet/util/output.cc                   |  19 +--
 src/parquet/util/output.h                    |  10 +-
 19 files changed, 378 insertions(+), 164 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 828ef31..3a2bbd8 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -63,9 +63,8 @@ class TestPrimitiveReader : public ::testing::Test {
 TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
   vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
 
-  std::vector<uint8_t> buffer;
   std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, {}, 0,
-    {}, 0, &buffer);
+    {}, 0);
   pages_.push_back(page);
 
   NodePtr type = schema::Int32("a", Repetition::REQUIRED);
@@ -90,9 +89,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
   vector<int32_t> values = {1, 2, 3, 4, 5};
   vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};
 
-  std::vector<uint8_t> buffer;
   std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, def_levels,
1,
-    {}, 0, &buffer);
+    {}, 0);
 
   pages_.push_back(page);
 
@@ -137,9 +135,8 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
   vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
   vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};
 
-  std::vector<uint8_t> buffer;
   std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values,
-      def_levels, 2, rep_levels, 1, &buffer);
+      def_levels, 2, rep_levels, 1);
 
   pages_.push_back(page);
 
@@ -190,12 +187,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeatedMultiplePages) {
   vector<int16_t> rep_levels[2] = {{0, 1, 1, 0, 0, 1, 1, 0, 0, 1},
     {0, 0, 1, 0, 1, 1, 0, 1, 0, 1}};
 
-  std::vector<uint8_t> buffer[4];
   std::shared_ptr<DataPage> page;
 
   for (int i = 0; i < 4; i++) {
     page = MakeDataPage<Type::INT32>(values[i % 2],
-        def_levels[i % 2], 2, rep_levels[i % 2], 1, &buffer[i]);
+        def_levels[i % 2], 2, rep_levels[i % 2], 1);
     pages_.push_back(page);
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
index 916fd12..3e5fd54 100644
--- a/src/parquet/column/page.h
+++ b/src/parquet/column/page.h
@@ -27,6 +27,7 @@
 #include <string>
 
 #include "parquet/types.h"
+#include "parquet/util/buffer.h"
 
 namespace parquet_cpp {
 
@@ -39,9 +40,8 @@ namespace parquet_cpp {
 // here, both on the read and write path
 class Page {
  public:
-  Page(const uint8_t* buffer, int32_t buffer_size, PageType::type type) :
+  Page(const std::shared_ptr<Buffer>& buffer, PageType::type type) :
       buffer_(buffer),
-      buffer_size_(buffer_size),
       type_(type) {}
 
   PageType::type type() const {
@@ -50,29 +50,27 @@ class Page {
 
   // @returns: a pointer to the page's data
   const uint8_t* data() const {
-    return buffer_;
+    return buffer_->data();
   }
 
   // @returns: the total size in bytes of the page's data buffer
   int32_t size() const {
-    return buffer_size_;
+    return buffer_->size();
   }
 
  private:
-  const uint8_t* buffer_;
-  int32_t buffer_size_;
-
+  std::shared_ptr<Buffer> buffer_;
   PageType::type type_;
 };
 
 
 class DataPage : public Page {
  public:
-  DataPage(const uint8_t* buffer, int32_t buffer_size,
+  DataPage(const std::shared_ptr<Buffer>& buffer,
       int32_t num_values, Encoding::type encoding,
       Encoding::type definition_level_encoding,
       Encoding::type repetition_level_encoding) :
-      Page(buffer, buffer_size, PageType::DATA_PAGE),
+      Page(buffer, PageType::DATA_PAGE),
       num_values_(num_values),
       encoding_(encoding),
       definition_level_encoding_(definition_level_encoding),
@@ -119,12 +117,12 @@ class DataPage : public Page {
 
 class DataPageV2 : public Page {
  public:
-  DataPageV2(const uint8_t* buffer, int32_t buffer_size,
+  DataPageV2(const std::shared_ptr<Buffer>& buffer,
       int32_t num_values, int32_t num_nulls, int32_t num_rows,
       Encoding::type encoding,
       int32_t definition_levels_byte_length,
       int32_t repetition_levels_byte_length, bool is_compressed = false) :
-      Page(buffer, buffer_size, PageType::DATA_PAGE_V2),
+      Page(buffer, PageType::DATA_PAGE_V2),
       num_values_(num_values),
       num_nulls_(num_nulls),
       num_rows_(num_rows),
@@ -176,9 +174,9 @@ class DataPageV2 : public Page {
 
 class DictionaryPage : public Page {
  public:
-  DictionaryPage(const uint8_t* buffer, int32_t buffer_size,
+  DictionaryPage(const std::shared_ptr<Buffer>& buffer,
       int32_t num_values, Encoding::type encoding, bool is_sorted = false) :
-      Page(buffer, buffer_size, PageType::DICTIONARY_PAGE),
+      Page(buffer, PageType::DICTIONARY_PAGE),
       num_values_(num_values),
       encoding_(encoding),
       is_sorted_(is_sorted) {}

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index b12f340..99f56b1 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -167,8 +167,7 @@ class DataPageBuilder {
 template <int TYPE, typename T>
 static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
     const std::vector<int16_t>& def_levels, int16_t max_def_level,
-    const std::vector<int16_t>& rep_levels, int16_t max_rep_level,
-    std::vector<uint8_t>* out_buffer) {
+    const std::vector<int16_t>& rep_levels, int16_t max_rep_level) {
   size_t num_values = values.size();
 
   InMemoryOutputStream page_stream;
@@ -183,10 +182,10 @@ static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>&
values,
   }
 
   page_builder.AppendValues(values);
-  page_stream.Transfer(out_buffer);
 
-  return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(),
-      page_builder.num_values(),
+  auto buffer = page_stream.GetBuffer();
+
+  return std::make_shared<DataPage>(buffer, page_builder.num_values(),
       page_builder.encoding(),
       page_builder.def_level_encoding(),
       page_builder.rep_level_encoding());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/encodings/plain-encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc
index 5091dc8..04eb907 100644
--- a/src/parquet/encodings/plain-encoding-test.cc
+++ b/src/parquet/encodings/plain-encoding-test.cc
@@ -28,6 +28,7 @@
 #include "parquet/types.h"
 #include "parquet/schema/types.h"
 #include "parquet/util/bit-util.h"
+#include "parquet/util/buffer.h"
 #include "parquet/util/output.h"
 #include "parquet/util/test-common.h"
 
@@ -52,15 +53,13 @@ TEST(VectorBooleanTest, TestEncodeDecode) {
   InMemoryOutputStream dst;
   encoder.Encode(draws, nvalues, &dst);
 
-  vector<uint8_t> encode_buffer;
-  dst.Transfer(&encode_buffer);
-
-  ASSERT_EQ(nbytes, encode_buffer.size());
+  std::shared_ptr<Buffer> encode_buffer = dst.GetBuffer();
+  ASSERT_EQ(nbytes, encode_buffer->size());
 
   vector<uint8_t> decode_buffer(nbytes);
   const uint8_t* decode_data = &decode_buffer[0];
 
-  decoder.SetData(nvalues, &encode_buffer[0], encode_buffer.size());
+  decoder.SetData(nvalues, encode_buffer->data(), encode_buffer->size());
   size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
   ASSERT_EQ(nvalues, values_decoded);
 
@@ -92,9 +91,10 @@ class EncodeDecode{
     InMemoryOutputStream dst;
     encoder.Encode(draws_, num_values_, &dst);
 
-    dst.Transfer(&encode_buffer_);
+    encode_buffer_ = dst.GetBuffer();
 
-    decoder.SetData(num_values_, &encode_buffer_[0], encode_buffer_.size());
+    decoder.SetData(num_values_, encode_buffer_->data(),
+        encode_buffer_->size());
     size_t values_decoded = decoder.Decode(decode_buf_, num_values_);
     ASSERT_EQ(num_values_, values_decoded);
   }
@@ -119,7 +119,8 @@ class EncodeDecode{
   vector<uint8_t> input_bytes_;
   vector<uint8_t> output_bytes_;
   vector<uint8_t> data_buffer_;
-  vector<uint8_t> encode_buffer_;
+
+  std::shared_ptr<Buffer> encode_buffer_;
 };
 
 template<>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index cfb3e86..40d599f 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -67,8 +67,7 @@ class TestPageSerde : public ::testing::Test {
       Compression::UNCOMPRESSED) {
     EndStream();
     std::unique_ptr<InputStream> stream;
-    stream.reset(new InMemoryInputStream(out_buffer_.data(),
-            out_buffer_.size()));
+    stream.reset(new InMemoryInputStream(out_buffer_));
     page_reader_.reset(new SerializedPageReader(std::move(stream), codec));
   }
 
@@ -89,19 +88,16 @@ class TestPageSerde : public ::testing::Test {
   }
 
   void ResetStream() {
-    out_buffer_.resize(0);
     out_stream_.reset(new InMemoryOutputStream());
   }
 
   void EndStream() {
-    out_stream_->Transfer(&out_buffer_);
+    out_buffer_ = out_stream_->GetBuffer();
   }
 
  protected:
   std::unique_ptr<InMemoryOutputStream> out_stream_;
-
-  // TODO(wesm): Owns the results of the output stream. To be refactored
-  std::vector<uint8_t> out_buffer_;
+  std::shared_ptr<Buffer> out_buffer_;
 
   std::unique_ptr<SerializedPageReader> page_reader_;
   parquet::PageHeader page_header_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 0a93b00..14fcbac 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -32,6 +32,7 @@
 #include "parquet/schema/types.h"
 #include "parquet/thrift/util.h"
 #include "parquet/types.h"
+#include "parquet/util/buffer.h"
 #include "parquet/util/input.h"
 
 namespace parquet_cpp {
@@ -83,7 +84,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       }
     }
     // Advance the stream offset
-    stream_->Read(header_size, &bytes_read);
+    stream_->Advance(header_size);
 
     int compressed_len = current_page_header_.compressed_page_size;
     int uncompressed_len = current_page_header_.uncompressed_page_size;
@@ -103,19 +104,21 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
       buffer = &decompression_buffer_[0];
     }
 
+    auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
+
     if (current_page_header_.type == parquet::PageType::DICTIONARY_PAGE) {
       const parquet::DictionaryPageHeader& dict_header =
         current_page_header_.dictionary_page_header;
 
       bool is_sorted = dict_header.__isset.is_sorted? dict_header.is_sorted : false;
 
-      return std::make_shared<DictionaryPage>(buffer, uncompressed_len,
+      return std::make_shared<DictionaryPage>(page_buffer,
           dict_header.num_values, FromThrift(dict_header.encoding),
           is_sorted);
     } else if (current_page_header_.type == parquet::PageType::DATA_PAGE) {
       const parquet::DataPageHeader& header = current_page_header_.data_page_header;
 
-      auto page = std::make_shared<DataPage>(buffer, uncompressed_len,
+      auto page = std::make_shared<DataPage>(page_buffer,
           header.num_values,
           FromThrift(header.encoding),
           FromThrift(header.definition_level_encoding),
@@ -134,7 +137,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
     } else if (current_page_header_.type == parquet::PageType::DATA_PAGE_V2) {
       const parquet::DataPageHeaderV2& header = current_page_header_.data_page_header_v2;
       bool is_compressed = header.__isset.is_compressed? header.is_compressed : false;
-      return std::make_shared<DataPageV2>(buffer, uncompressed_len,
+      return std::make_shared<DataPageV2>(page_buffer,
           header.num_values, header.num_nulls, header.num_rows,
           FromThrift(header.encoding),
           header.definition_levels_byte_length,
@@ -165,24 +168,15 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int
i) {
     col_start = col.meta_data.dictionary_page_offset;
   }
 
-  // TODO(wesm): some input streams (e.g. memory maps) may not require
-  // copying data. This should be added to the input stream API to support
-  // zero-copy streaming
-  std::unique_ptr<InputStream> input(
-      new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
-
-  source_->Seek(col_start);
-  ScopedInMemoryInputStream* scoped_input =
-    static_cast<ScopedInMemoryInputStream*>(input.get());
-  size_t bytes_read = source_->Read(scoped_input->size(), scoped_input->data());
+  int64_t bytes_to_read = col.meta_data.total_compressed_size;
+  std::shared_ptr<Buffer> buffer = source_->ReadAt(col_start, bytes_to_read);
 
-  if (bytes_read != scoped_input->size()) {
+  if (buffer->size() < bytes_to_read) {
     throw ParquetException("Unable to read column chunk data");
   }
 
-  const ColumnDescriptor* descr = schema_->Column(i);
-
-  return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(input),
+  std::unique_ptr<InputStream> stream(new InMemoryInputStream(buffer));
+  return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(stream),
           FromThrift(col.meta_data.codec)));
 }
 
@@ -223,7 +217,7 @@ void SerializedFile::Close() {
 
 std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
   std::unique_ptr<SerializedRowGroup> contents(new SerializedRowGroup(source_.get(),
-          &schema_, &metadata_.row_groups[i]));
+          &metadata_.row_groups[i]));
 
   return std::make_shared<RowGroupReader>(&schema_, std::move(contents));
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index b7e9154..7aff74a 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -72,10 +72,9 @@ class SerializedPageReader : public PageReader {
 // RowGroupReader::Contents implementation for the Parquet file specification
 class SerializedRowGroup : public RowGroupReader::Contents {
  public:
-  SerializedRowGroup(RandomAccessSource* source, const SchemaDescriptor* schema,
+  SerializedRowGroup(RandomAccessSource* source,
       const parquet::RowGroup* metadata) :
       source_(source),
-      schema_(schema),
       metadata_(metadata) {}
 
   virtual int num_columns() const;
@@ -84,7 +83,6 @@ class SerializedRowGroup : public RowGroupReader::Contents {
 
  private:
   RandomAccessSource* source_;
-  const SchemaDescriptor* schema_;
   const parquet::RowGroup* metadata_;
 };
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 9da0f09..a2885a7 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -51,19 +51,10 @@ int RowGroupReader::num_columns() const {
 
 std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
   // TODO: boundschecking
-  auto it = column_readers_.find(i);
-  if (it !=  column_readers_.end()) {
-    // Already have constructed the ColumnReader
-    return it->second;
-  }
-
   const ColumnDescriptor* descr = schema_->Column(i);
 
   std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
-  std::shared_ptr<ColumnReader> reader = ColumnReader::Make(descr,
-      std::move(page_reader));
-  column_readers_[i] = reader;
-  return reader;
+  return ColumnReader::Make(descr, std::move(page_reader));
 }
 
 RowGroupStatistics RowGroupReader::GetColumnStats(int i) const {
@@ -109,7 +100,7 @@ int ParquetFileReader::num_columns() const {
   return schema_->num_columns();
 }
 
-RowGroupReader* ParquetFileReader::RowGroup(int i) {
+std::shared_ptr<RowGroupReader> ParquetFileReader::RowGroup(int i) {
   if (i >= num_row_groups()) {
     std::stringstream ss;
     ss << "The file only has " << num_row_groups()
@@ -118,14 +109,7 @@ RowGroupReader* ParquetFileReader::RowGroup(int i) {
     throw ParquetException(ss.str());
   }
 
-  auto it = row_group_readers_.find(i);
-  if (it != row_group_readers_.end()) {
-    // Constructed the RowGroupReader already
-    return it->second.get();
-  }
-
-  row_group_readers_[i] = contents_->GetRowGroup(i);
-  return row_group_readers_[i].get();
+  return contents_->GetRowGroup(i);
 }
 
 // ----------------------------------------------------------------------
@@ -134,7 +118,6 @@ RowGroupReader* ParquetFileReader::RowGroup(int i) {
 // the fixed initial size is just for an example
 #define COL_WIDTH "20"
 
-
 void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
   stream << "File statistics:\n";
   stream << "Total rows: " << this->num_rows() << "\n";
@@ -151,7 +134,7 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values)
{
   for (int r = 0; r < num_row_groups(); ++r) {
     stream << "--- Row Group " << r << " ---\n";
 
-    RowGroupReader* group_reader = RowGroup(r);
+    auto group_reader = RowGroup(r);
 
     // Print column metadata
     size_t num_columns = group_reader->num_columns();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 32ae429..fcc2c18 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -22,7 +22,6 @@
 #include <iosfwd>
 #include <memory>
 #include <string>
-#include <unordered_map>
 
 #include "parquet/column/page.h"
 #include "parquet/schema/descriptor.h"
@@ -63,9 +62,6 @@ class RowGroupReader {
   // This is declared in the .cc file so that we can hide compiled Thrift
   // headers from the public API and also more easily create test fixtures.
   std::unique_ptr<Contents> contents_;
-
-  // Column index -> ColumnReader
-  std::unordered_map<int, std::shared_ptr<ColumnReader> > column_readers_;
 };
 
 
@@ -99,7 +95,7 @@ class ParquetFileReader {
   void Close();
 
   // The RowGroupReader is owned by the FileReader
-  RowGroupReader* RowGroup(int i);
+  std::shared_ptr<RowGroupReader> RowGroup(int i);
 
   int num_columns() const;
   int64_t num_rows() const;
@@ -124,9 +120,6 @@ class ParquetFileReader {
 
   // The SchemaDescriptor is provided by the Contents impl
   const SchemaDescriptor* schema_;
-
-  // Row group index -> RowGroupReader
-  std::unordered_map<int, std::shared_ptr<RowGroupReader> > row_group_readers_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index 97a5f79..be22e5a 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -56,7 +56,7 @@ TEST_F(TestAllTypesPlain, NoopConstructDestruct) {
 }
 
 TEST_F(TestAllTypesPlain, TestBatchRead) {
-  RowGroupReader* group = reader_->RowGroup(0);
+  std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
 
   // column 0, id
   std::shared_ptr<Int32Reader> col =
@@ -84,7 +84,7 @@ TEST_F(TestAllTypesPlain, TestBatchRead) {
 }
 
 TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
-  RowGroupReader* group = reader_->RowGroup(0);
+  std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
 
   // column 0, id
   std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
@@ -101,7 +101,7 @@ TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
 
 
 TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) {
-  RowGroupReader* group = reader_->RowGroup(0);
+  std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
 
   // column 0, id
   std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 504069f..2b782fc 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -32,6 +32,7 @@ install(FILES
   DESTINATION include/parquet/util)
 
 add_library(parquet_util STATIC
+  buffer.cc
   input.cc
   output.cc
   cpu-info.cc
@@ -56,5 +57,6 @@ if(PARQUET_BUILD_TESTS)
 endif()
 
 ADD_PARQUET_TEST(bit-util-test)
+ADD_PARQUET_TEST(buffer-test)
 ADD_PARQUET_TEST(output-test)
 ADD_PARQUET_TEST(rle-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/util/buffer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer-test.cc b/src/parquet/util/buffer-test.cc
new file mode 100644
index 0000000..7772e49
--- /dev/null
+++ b/src/parquet/util/buffer-test.cc
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+#include <cstdlib>
+#include <cstdint>
+#include <limits>
+#include <memory>
+#include <string>
+
+#include "parquet/exception.h"
+#include "parquet/util/buffer.h"
+
+using std::string;
+
+namespace parquet_cpp {
+
+class TestBuffer : public ::testing::Test {
+};
+
+TEST_F(TestBuffer, Resize) {
+  OwnedMutableBuffer buf;
+
+  ASSERT_EQ(0, buf.size());
+  ASSERT_NO_THROW(buf.Resize(100));
+  ASSERT_EQ(100, buf.size());
+  ASSERT_NO_THROW(buf.Resize(200));
+  ASSERT_EQ(200, buf.size());
+
+  // Make it smaller, too
+  ASSERT_NO_THROW(buf.Resize(50));
+  ASSERT_EQ(50, buf.size());
+}
+
+TEST_F(TestBuffer, ResizeOOM) {
+  // realloc fails, even though there may be no explicit limit
+  OwnedMutableBuffer buf;
+  ASSERT_NO_THROW(buf.Resize(100));
+  int64_t to_alloc = std::numeric_limits<int64_t>::max();
+  ASSERT_THROW(buf.Resize(to_alloc), ParquetException);
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/util/buffer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.cc b/src/parquet/util/buffer.cc
new file mode 100644
index 0000000..c069419
--- /dev/null
+++ b/src/parquet/util/buffer.cc
@@ -0,0 +1,51 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/buffer.h"
+
+#include <cstdint>
+
+#include "parquet/exception.h"
+
+namespace parquet_cpp {
+
+Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset,
+    int64_t size) {
+  data_ = parent->data() + offset;
+  size_ = size;
+  parent_ = parent;
+}
+
+std::shared_ptr<Buffer> MutableBuffer::GetImmutableView() {
+  return std::make_shared<Buffer>(this->get_shared_ptr(), 0, size());
+}
+
+OwnedMutableBuffer::OwnedMutableBuffer() :
+    ResizableBuffer(nullptr, 0) {}
+
+void OwnedMutableBuffer::Resize(int64_t new_size) {
+  size_ = new_size;
+  try {
+    buffer_owner_.resize(new_size);
+  } catch (const std::bad_alloc& e) {
+    throw ParquetException("OOM: resize failed");
+  }
+  data_ = buffer_owner_.data();
+  mutable_data_ = buffer_owner_.data();
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/util/buffer.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.h b/src/parquet/util/buffer.h
new file mode 100644
index 0000000..8be2b17
--- /dev/null
+++ b/src/parquet/util/buffer.h
@@ -0,0 +1,140 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_BUFFER_H
+#define PARQUET_UTIL_BUFFER_H
+
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <memory>
+#include <vector>
+
+#include "parquet/util/macros.h"
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// Buffer classes
+
+// Immutable API for a chunk of bytes which may or may not be owned by the
+// class instance
+class Buffer : public std::enable_shared_from_this<Buffer> {
+ public:
+  Buffer(const uint8_t* data, int64_t size) :
+      data_(data),
+      size_(size) {}
+
+  // An offset into data that is owned by another buffer, but we want to be
+  // able to retain a valid pointer to it even after other shared_ptr's to the
+  // parent buffer have been destroyed
+  Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size);
+
+  std::shared_ptr<Buffer> get_shared_ptr() {
+    return shared_from_this();
+  }
+
+  // Return true if both buffers are the same size and contain the same bytes
+  // up to the number of compared bytes
+  bool Equals(const Buffer& other, int64_t nbytes) const {
+    return this == &other ||
+      (size_ >= nbytes && other.size_ >= nbytes &&
+          !memcmp(data_, other.data_, nbytes));
+  }
+
+  bool Equals(const Buffer& other) const {
+    return this == &other ||
+      (size_ == other.size_ && !memcmp(data_, other.data_, size_));
+  }
+
+  const uint8_t* data() const {
+    return data_;
+  }
+
+  int64_t size() const {
+    return size_;
+  }
+
+  // Returns true if this Buffer is referencing memory (possibly) owned by some
+  // other buffer
+  bool is_shared() const {
+    return static_cast<bool>(parent_);
+  }
+
+  const std::shared_ptr<Buffer> parent() const {
+    return parent_;
+  }
+
+ protected:
+  const uint8_t* data_;
+  int64_t size_;
+
+  // nullptr by default, but may be set
+  std::shared_ptr<Buffer> parent_;
+
+ private:
+  DISALLOW_COPY_AND_ASSIGN(Buffer);
+};
+
+// A Buffer whose contents can be mutated. May or may not own its data.
+class MutableBuffer : public Buffer {
+ public:
+  MutableBuffer(uint8_t* data, int64_t size) :
+      Buffer(data, size) {
+    mutable_data_ = data;
+  }
+
+  uint8_t* mutable_data() {
+    return mutable_data_;
+  }
+
+  // Get a read-only view of this buffer
+  std::shared_ptr<Buffer> GetImmutableView();
+
+ protected:
+  MutableBuffer() :
+      Buffer(nullptr, 0),
+      mutable_data_(nullptr) {}
+
+  uint8_t* mutable_data_;
+};
+
+class ResizableBuffer : public MutableBuffer {
+ public:
+  virtual void Resize(int64_t new_size) = 0;
+
+ protected:
+  ResizableBuffer(uint8_t* data, int64_t size) :
+      MutableBuffer(data, size) {}
+};
+
+// A ResizableBuffer whose memory is owned by the class instance. For example,
+// for reading data out of files that you want to deallocate when this class is
+// garbage-collected
+class OwnedMutableBuffer : public ResizableBuffer {
+ public:
+  OwnedMutableBuffer();
+  virtual void Resize(int64_t new_size);
+
+ private:
+  // TODO: aligned allocations
+  std::vector<uint8_t> buffer_owner_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_BUFFER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
index 0e4b833..1df9060 100644
--- a/src/parquet/util/input.cc
+++ b/src/parquet/util/input.cc
@@ -21,10 +21,19 @@
 #include <string>
 
 #include "parquet/exception.h"
+#include "parquet/util/buffer.h"
 
 namespace parquet_cpp {
 
 // ----------------------------------------------------------------------
+// RandomAccessSource
+
+std::shared_ptr<Buffer> RandomAccessSource::ReadAt(int64_t pos, int64_t nbytes) {
+  Seek(pos);
+  return Read(nbytes);
+}
+
+// ----------------------------------------------------------------------
 // LocalFileSource
 
 LocalFileSource::~LocalFileSource() {
@@ -49,32 +58,45 @@ void LocalFileSource::CloseFile() {
   }
 }
 
-size_t LocalFileSource::Size() {
+int64_t LocalFileSource::Size() {
   fseek(file_, 0L, SEEK_END);
   return Tell();
 }
 
-void LocalFileSource::Seek(size_t pos) {
+void LocalFileSource::Seek(int64_t pos) {
   fseek(file_, pos, SEEK_SET);
 }
 
-size_t LocalFileSource::Tell() {
+int64_t LocalFileSource::Tell() {
   return ftell(file_);
 }
 
-size_t LocalFileSource::Read(size_t nbytes, uint8_t* buffer) {
+int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) {
   return fread(buffer, 1, nbytes, file_);
 }
 
+std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) {
+  auto result = std::make_shared<OwnedMutableBuffer>();
+  result->Resize(nbytes);
+
+  int64_t bytes_read = Read(nbytes, result->mutable_data());
+  if (bytes_read < nbytes) {
+    result->Resize(bytes_read);
+  }
+  return result;
+}
+
 // ----------------------------------------------------------------------
 // InMemoryInputStream
 
-InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
-    buffer_(buffer), len_(len), offset_(0) {}
+InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>& buffer)
:
+    buffer_(buffer), offset_(0) {
+  len_ = buffer_->size();
+}
 
 const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
   *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
-  return buffer_ + offset_;
+  return buffer_->data() + offset_;
 }
 
 const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
@@ -83,28 +105,8 @@ const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t*
num_bytes
   return result;
 }
 
-// ----------------------------------------------------------------------
-// ScopedInMemoryInputStream:: like InMemoryInputStream but owns its memory
-
-ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) {
-  buffer_.resize(len);
-  stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size()));
-}
-
-uint8_t* ScopedInMemoryInputStream::data() {
-  return buffer_.data();
-}
-
-int64_t ScopedInMemoryInputStream::size() {
-  return buffer_.size();
-}
-
-const uint8_t* ScopedInMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
-  return stream_->Peek(num_to_peek, num_bytes);
-}
-
-const uint8_t* ScopedInMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
-  return stream_->Read(num_to_read, num_bytes);
+void InMemoryInputStream::Advance(int64_t num_bytes) {
+  offset_ += num_bytes;
 }
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
index d9b07fd..ac1b4b5 100644
--- a/src/parquet/util/input.h
+++ b/src/parquet/util/input.h
@@ -26,6 +26,8 @@
 
 namespace parquet_cpp {
 
+class Buffer;
+
 // ----------------------------------------------------------------------
 // Random access input (e.g. file-like)
 
@@ -35,12 +37,15 @@ class RandomAccessSource {
   virtual ~RandomAccessSource() {}
 
   virtual void Close() = 0;
-  virtual size_t Size() = 0;
-  virtual size_t Tell() = 0;
-  virtual void Seek(size_t pos) = 0;
+  virtual int64_t Size() = 0;
+  virtual int64_t Tell() = 0;
+  virtual void Seek(int64_t pos) = 0;
 
   // Returns actual number of bytes read
-  virtual size_t Read(size_t nbytes, uint8_t* out) = 0;
+  virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;
+
+  virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0;
+  std::shared_ptr<Buffer> ReadAt(int64_t pos, int64_t nbytes);
 };
 
 
@@ -52,12 +57,14 @@ class LocalFileSource : public RandomAccessSource {
   void Open(const std::string& path);
 
   virtual void Close();
-  virtual size_t Size();
-  virtual size_t Tell();
-  virtual void Seek(size_t pos);
+  virtual int64_t Size();
+  virtual int64_t Tell();
+  virtual void Seek(int64_t pos);
 
   // Returns actual number of bytes read
-  virtual size_t Read(size_t nbytes, uint8_t* out);
+  virtual int64_t Read(int64_t nbytes, uint8_t* out);
+
+  virtual std::shared_ptr<Buffer> Read(int64_t nbytes);
 
   bool is_open() const { return is_open_;}
   const std::string& path() const { return path_;}
@@ -90,6 +97,9 @@ class InputStream {
   // *num_bytes.
   virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0;
 
+  // Advance the stream without reading
+  virtual void Advance(int64_t num_bytes) = 0;
+
   virtual ~InputStream() {}
 
  protected:
@@ -99,31 +109,18 @@ class InputStream {
 // Implementation of an InputStream when all the bytes are in memory.
 class InMemoryInputStream : public InputStream {
  public:
-  InMemoryInputStream(const uint8_t* buffer, int64_t len);
+  explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer);
   virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
   virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
 
+  virtual void Advance(int64_t num_bytes);
+
  private:
-  const uint8_t* buffer_;
+  std::shared_ptr<Buffer> buffer_;
   int64_t len_;
   int64_t offset_;
 };
 
-
-// A wrapper for InMemoryInputStream to manage the memory.
-class ScopedInMemoryInputStream : public InputStream {
- public:
-  explicit ScopedInMemoryInputStream(int64_t len);
-  uint8_t* data();
-  int64_t size();
-  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
-  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
-
- private:
-  std::vector<uint8_t> buffer_;
-  std::unique_ptr<InMemoryInputStream> stream_;
-};
-
 } // namespace parquet_cpp
 
 #endif // PARQUET_UTIL_INPUT_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/util/output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output-test.cc b/src/parquet/util/output-test.cc
index 5fbca4a..bae184a 100644
--- a/src/parquet/util/output-test.cc
+++ b/src/parquet/util/output-test.cc
@@ -21,6 +21,7 @@
 #include <memory>
 #include <vector>
 
+#include "parquet/util/buffer.h"
 #include "parquet/util/output.h"
 #include "parquet/util/test-common.h"
 
@@ -35,12 +36,11 @@ TEST(TestInMemoryOutputStream, Basics) {
   ASSERT_EQ(4, stream->Tell());
   stream->Write(&data[4], data.size() - 4);
 
-  std::vector<uint8_t> out;
-  stream->Transfer(&out);
+  std::shared_ptr<Buffer> buffer = stream->GetBuffer();
 
-  test::assert_vector_equal(data, out);
+  Buffer data_buf(data.data(), data.size());
 
-  ASSERT_EQ(0, stream->Tell());
+  ASSERT_TRUE(data_buf.Equals(*buffer));
 }
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/util/output.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc
index f0c8989..b28421e 100644
--- a/src/parquet/util/output.cc
+++ b/src/parquet/util/output.cc
@@ -18,8 +18,10 @@
 #include "parquet/util/output.h"
 
 #include <cstring>
+#include <memory>
 
 #include "parquet/exception.h"
+#include "parquet/util/buffer.h"
 
 namespace parquet_cpp {
 
@@ -34,14 +36,15 @@ InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity) :
   if (initial_capacity == 0) {
     initial_capacity = IN_MEMORY_DEFAULT_CAPACITY;
   }
-  buffer_.resize(initial_capacity);
+  buffer_.reset(new OwnedMutableBuffer());
+  buffer_->Resize(initial_capacity);
 }
 
 InMemoryOutputStream::InMemoryOutputStream() :
     InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY) {}
 
 uint8_t* InMemoryOutputStream::Head() {
-  return &buffer_[size_];
+  return buffer_->mutable_data() + size_;
 }
 
 void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) {
@@ -50,7 +53,7 @@ void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) {
     while (new_capacity < size_ + length) {
       new_capacity *= 2;
     }
-    buffer_.resize(new_capacity);
+    buffer_->Resize(new_capacity);
     capacity_ = new_capacity;
   }
   memcpy(Head(), data, length);
@@ -61,11 +64,11 @@ int64_t InMemoryOutputStream::Tell() {
   return size_;
 }
 
-void InMemoryOutputStream::Transfer(std::vector<uint8_t>* out) {
-  buffer_.resize(size_);
-  buffer_.swap(*out);
-  size_ = 0;
-  capacity_ = buffer_.size();
+std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() {
+  buffer_->Resize(size_);
+  std::shared_ptr<Buffer> result = buffer_;
+  buffer_ = nullptr;
+  return result;
 }
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/b259188d/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
index 68a09e2..2a43a36 100644
--- a/src/parquet/util/output.h
+++ b/src/parquet/util/output.h
@@ -19,12 +19,16 @@
 #define PARQUET_UTIL_OUTPUT_H
 
 #include <cstdint>
+#include <memory>
 #include <vector>
 
 #include "parquet/util/macros.h"
 
 namespace parquet_cpp {
 
+class Buffer;
+class ResizableBuffer;
+
 // ----------------------------------------------------------------------
 // Output stream classes
 
@@ -55,14 +59,14 @@ class InMemoryOutputStream : public OutputStream {
 
   virtual void Write(const uint8_t* data, int64_t length);
 
-  // Hand off the in-memory data to a (preferably-empty) std::vector owner
-  void Transfer(std::vector<uint8_t>* out);
+  // Return complete stream as Buffer
+  std::shared_ptr<Buffer> GetBuffer();
 
  private:
   // Mutable pointer to the current write position in the stream
   uint8_t* Head();
 
-  std::vector<uint8_t> buffer_;
+  std::shared_ptr<ResizableBuffer> buffer_;
   int64_t size_;
   int64_t capacity_;
 


Mime
View raw message