parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject parquet-cpp git commit: PARQUET-472: Changed the ownership of InputStream in ColumnReader.
Date Fri, 29 Jan 2016 17:01:52 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master 89f5a5439 -> 5e938171b


PARQUET-472: Changed the ownership of InputStream in ColumnReader.

Author: Aliaksei Sandryhaila <aliaksei.sandryhaila@hp.com>

Closes #29 from asandryh/parquet-472 and squashes the following commits:

4bcbbb1 [Aliaksei Sandryhaila] Addressed review comments.
58c2da2 [Aliaksei Sandryhaila] PARQUET-472: Changed the ownership of InputStream in ColumnReader.


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/5e938171
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/5e938171
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/5e938171

Branch: refs/heads/master
Commit: 5e938171b5b6d1effad27559b001c42f7ee0bb2c
Parents: 89f5a54
Author: Aliaksei Sandryhaila <aliaksei.sandryhaila@hp.com>
Authored: Fri Jan 29 09:01:44 2016 -0800
Committer: Julien Le Dem <julien@dremio.com>
Committed: Fri Jan 29 09:01:44 2016 -0800

----------------------------------------------------------------------
 src/parquet/column_reader.cc | 27 +++++++++++----------------
 src/parquet/column_reader.h  | 14 ++++++--------
 src/parquet/reader.cc        | 10 +++++-----
 3 files changed, 22 insertions(+), 29 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e938171/src/parquet/column_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
index 4f565cf..7c88a32 100644
--- a/src/parquet/column_reader.cc
+++ b/src/parquet/column_reader.cc
@@ -36,16 +36,11 @@ using parquet::FieldRepetitionType;
 using parquet::PageType;
 using parquet::Type;
 
-
-ColumnReader::~ColumnReader() {
-  delete stream_;
-}
-
 ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
-    const parquet::SchemaElement* schema, InputStream* stream)
+    const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream)
   : metadata_(metadata),
     schema_(schema),
-    stream_(stream),
+    stream_(std::move(stream)),
     num_buffered_values_(0),
     num_decoded_values_(0),
     buffered_values_offset_(0) {
@@ -171,24 +166,24 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
 }
 
 std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata,
-    const parquet::SchemaElement* element, InputStream* stream) {
+    const parquet::SchemaElement* element, std::unique_ptr<InputStream> stream) {
   switch (metadata->type) {
     case Type::BOOLEAN:
-      return std::make_shared<BoolReader>(metadata, element, stream);
+      return std::make_shared<BoolReader>(metadata, element, std::move(stream));
     case Type::INT32:
-      return std::make_shared<Int32Reader>(metadata, element, stream);
+      return std::make_shared<Int32Reader>(metadata, element, std::move(stream));
     case Type::INT64:
-      return std::make_shared<Int64Reader>(metadata, element, stream);
+      return std::make_shared<Int64Reader>(metadata, element, std::move(stream));
     case Type::INT96:
-      return std::make_shared<Int96Reader>(metadata, element, stream);
+      return std::make_shared<Int96Reader>(metadata, element, std::move(stream));
     case Type::FLOAT:
-      return std::make_shared<FloatReader>(metadata, element, stream);
+      return std::make_shared<FloatReader>(metadata, element, std::move(stream));
     case Type::DOUBLE:
-      return std::make_shared<DoubleReader>(metadata, element, stream);
+      return std::make_shared<DoubleReader>(metadata, element, std::move(stream));
     case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayReader>(metadata, element, stream);
+      return std::make_shared<ByteArrayReader>(metadata, element, std::move(stream));
     case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayReader>(metadata, element, stream);
+      return std::make_shared<FixedLenByteArrayReader>(metadata, element, std::move(stream));
     default:
       ParquetException::NYI("type reader not implemented");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e938171/src/parquet/column_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
index 00722f5..3d27549 100644
--- a/src/parquet/column_reader.h
+++ b/src/parquet/column_reader.h
@@ -62,13 +62,11 @@ class ColumnReader {
     }
   };
 
-  ColumnReader(const parquet::ColumnMetaData*,
-      const parquet::SchemaElement*, InputStream* stream);
-
-  virtual ~ColumnReader();
+  ColumnReader(const parquet::ColumnMetaData*, const parquet::SchemaElement*,
+      std::unique_ptr<InputStream> stream);
 
   static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*,
-      const parquet::SchemaElement*, InputStream* stream);
+      const parquet::SchemaElement*, std::unique_ptr<InputStream> stream);
 
   virtual bool ReadNewPage() = 0;
 
@@ -97,7 +95,7 @@ class ColumnReader {
 
   const parquet::ColumnMetaData* metadata_;
   const parquet::SchemaElement* schema_;
-  InputStream* stream_;
+  std::unique_ptr<InputStream> stream_;
 
   // Compression codec to use.
   std::unique_ptr<Codec> decompressor_;
@@ -123,8 +121,8 @@ class TypedColumnReader : public ColumnReader {
   typedef typename type_traits<TYPE>::value_type T;
 
   TypedColumnReader(const parquet::ColumnMetaData* metadata,
-      const parquet::SchemaElement* schema, InputStream* stream) :
-      ColumnReader(metadata, schema, stream),
+      const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream) :
+      ColumnReader(metadata, schema, std::move(stream)),
       current_decoder_(NULL) {
     size_t value_byte_size = type_traits<TYPE>::value_byte_size;
     values_buffer_.resize(config_.batch_size * value_byte_size);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/5e938171/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index 4654f14..d467b75 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -98,7 +98,7 @@ ColumnReader* RowGroupReader::Column(size_t i) {
     col_start = col.meta_data.dictionary_page_offset;
   }
 
-  std::unique_ptr<ScopedInMemoryInputStream> input(
+  std::unique_ptr<InputStream> input(
       new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
 
   FileLike* source = this->parent_->buffer_;
@@ -106,9 +106,9 @@ ColumnReader* RowGroupReader::Column(size_t i) {
   source->Seek(col_start);
 
   // TODO(wesm): Law of demeter violation
-  size_t bytes_read = source->Read(input->size(), input->data());
-
-  if (bytes_read != input->size()) {
+  ScopedInMemoryInputStream* scoped_input = static_cast<ScopedInMemoryInputStream*>(input.get());
+  size_t bytes_read = source->Read(scoped_input->size(), scoped_input->data());
+  if (bytes_read != scoped_input->size()) {
     std::cout << "Bytes needed: " << col.meta_data.total_compressed_size <<
std::endl;
     std::cout << "Bytes read: " << bytes_read << std::endl;
     throw ParquetException("Unable to read column chunk data");
@@ -116,7 +116,7 @@ ColumnReader* RowGroupReader::Column(size_t i) {
 
   // TODO(wesm): This presumes a flat schema
   std::shared_ptr<ColumnReader> reader = ColumnReader::Make(&col.meta_data,
-      &this->parent_->metadata_.schema[i + 1], input.release());
+      &this->parent_->metadata_.schema[i + 1], std::move(input));
   column_readers_[i] = reader;
 
   return reader.get();


Mime
View raw message