parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From w...@apache.org
Subject parquet-cpp git commit: PARQUET-589: Implement BufferedInputStream for better memory usage
Date Thu, 05 May 2016 04:29:21 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master b7b7fa22f -> 8a6eca9d1


PARQUET-589: Implement BufferedInputStream for better memory usage

Resolves PARQUET-576 and PARQUET-590

Author: Deepak Majeti <deepak.majeti@hpe.com>

Closes #89 from majetideepak/chunkedInMemory and squashes the following commits:

c529616 [Deepak Majeti] review comments
400e4f7 [Deepak Majeti] removed inline for functions with switch
9468051 [Deepak Majeti] review comments
47b8cae [Deepak Majeti] fixed clang-format issues
364ec9d [Deepak Majeti] rebased and resolved conflicts
85cd30d [Deepak Majeti] review comments
ccf5049 [Deepak Majeti] addressed reviews and added tests
860e40b [Deepak Majeti] added Reader and Writer Properties
098455c [Deepak Majeti] PARQUET-589: Implement Chunked InMemoryInputStream for better memory
usage


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/8a6eca9d
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/8a6eca9d
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/8a6eca9d

Branch: refs/heads/master
Commit: 8a6eca9d12cd7f5e2bdb95679e98d0ae6aea89af
Parents: b7b7fa2
Author: Deepak Majeti <deepak.majeti@hpe.com>
Authored: Wed May 4 21:29:13 2016 -0700
Committer: Wes McKinney <wesm@apache.org>
Committed: Wed May 4 21:29:13 2016 -0700

----------------------------------------------------------------------
 CMakeLists.txt                        |   2 +
 example/parquet_reader.cc             |   2 +-
 src/parquet/column/CMakeLists.txt     |   1 +
 src/parquet/column/properties-test.cc |  45 ++++++++
 src/parquet/column/properties.h       | 115 +++++++++++++++++++
 src/parquet/column/reader.cc          |   6 +
 src/parquet/column/writer.cc          |   6 +
 src/parquet/file/reader-internal.cc   |  24 ++--
 src/parquet/file/reader-internal.h    |  13 ++-
 src/parquet/file/reader.cc            |  21 ++--
 src/parquet/file/reader.h             |   5 +-
 src/parquet/types-test.cc             |  63 +++++++++++
 src/parquet/types.cc                  | 170 +++++++++++++++++++++++++++++
 src/parquet/types.h                   | 102 +----------------
 src/parquet/util/input-output-test.cc |  61 +++++++++++
 src/parquet/util/input.cc             |  54 +++++++++
 src/parquet/util/input.h              |  21 ++++
 17 files changed, 583 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 39f7585..a16b41b 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -441,6 +441,8 @@ endif()
 # Library config
 
 set(LIBPARQUET_SRCS
+  src/parquet/types.cc
+
   src/parquet/column/reader.cc
   src/parquet/column/writer.cc
   src/parquet/column/scanner.cc

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index cf90d99..c255416 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -24,7 +24,7 @@
 using namespace parquet;
 
 int main(int argc, char** argv) {
-  if (argc > 3) {
+  if (argc > 5 || argc < 2) {
     std::cerr << "Usage: parquet_reader [--only-stats] [--no-memory-map] [--columns=...]
<file>"
               << std::endl;
     return -1;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/column/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt
index ace0072..bc380f3 100644
--- a/src/parquet/column/CMakeLists.txt
+++ b/src/parquet/column/CMakeLists.txt
@@ -26,6 +26,7 @@ install(FILES
 ADD_PARQUET_TEST(column-reader-test)
 ADD_PARQUET_TEST(column-writer-test)
 ADD_PARQUET_TEST(levels-test)
+ADD_PARQUET_TEST(properties-test)
 ADD_PARQUET_TEST(scanner-test)
 
 ADD_PARQUET_BENCHMARK(column-io-benchmark)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/column/properties-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties-test.cc b/src/parquet/column/properties-test.cc
new file mode 100644
index 0000000..42be045
--- /dev/null
+++ b/src/parquet/column/properties-test.cc
@@ -0,0 +1,45 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <string>
+
+#include "parquet/column/properties.h"
+#include "parquet/file/reader.h"
+
+namespace parquet {
+
+namespace test {
+
+TEST(TestReaderProperties, Basics) {
+  ReaderProperties props;
+
+  ASSERT_EQ(DEFAULT_BUFFER_SIZE, props.buffer_size());
+  ASSERT_EQ(DEFAULT_USE_BUFFERED_STREAM, props.is_buffered_stream_enabled());
+}
+
+TEST(TestWriterProperties, Basics) {
+  WriterProperties props;
+
+  ASSERT_EQ(DEFAULT_PAGE_SIZE, props.data_pagesize());
+  ASSERT_EQ(DEFAULT_DICTIONARY_PAGE_SIZE, props.dictionary_pagesize());
+  ASSERT_EQ(DEFAULT_IS_DICTIONARY_ENABLED, props.is_dictionary_enabled());
+}
+
+}  // namespace test
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/column/properties.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/properties.h b/src/parquet/column/properties.h
new file mode 100644
index 0000000..40d04c3
--- /dev/null
+++ b/src/parquet/column/properties.h
@@ -0,0 +1,115 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_PROPERTIES_H
+#define PARQUET_COLUMN_PROPERTIES_H
+
+#include <memory>
+#include <string>
+
+#include "parquet/util/input.h"
+#include "parquet/util/mem-allocator.h"
+#include "parquet/types.h"
+
+namespace parquet {
+
+static int64_t DEFAULT_BUFFER_SIZE = 0;
+static bool DEFAULT_USE_BUFFERED_STREAM = false;
+
+class ReaderProperties {
+ public:
+  explicit ReaderProperties(MemoryAllocator* allocator = default_allocator())
+      : allocator_(allocator) {
+    buffered_stream_enabled_ = DEFAULT_USE_BUFFERED_STREAM;
+    buffer_size_ = DEFAULT_BUFFER_SIZE;
+  }
+
+  MemoryAllocator* allocator() { return allocator_; }
+
+  std::unique_ptr<InputStream> GetStream(
+      RandomAccessSource* source, int64_t start, int64_t num_bytes) {
+    std::unique_ptr<InputStream> stream;
+    if (buffered_stream_enabled_) {
+      stream.reset(
+          new BufferedInputStream(allocator_, buffer_size_, source, start, num_bytes));
+    } else {
+      stream.reset(new InMemoryInputStream(source, start, num_bytes));
+    }
+    return stream;
+  }
+
+  bool is_buffered_stream_enabled() const { return buffered_stream_enabled_; }
+
+  void enable_buffered_stream() { buffered_stream_enabled_ = true; }
+
+  void disable_buffered_stream() { buffered_stream_enabled_ = false; }
+
+  void set_buffer_size(int64_t buf_size) { buffer_size_ = buf_size; }
+
+  int64_t buffer_size() const { return buffer_size_; }
+
+ private:
+  MemoryAllocator* allocator_;
+  int64_t buffer_size_;
+  bool buffered_stream_enabled_;
+};
+
+ReaderProperties default_reader_properties();
+
+static int64_t DEFAULT_PAGE_SIZE = 1024 * 1024;
+static int64_t DEFAULT_DICTIONARY_PAGE_SIZE = DEFAULT_PAGE_SIZE;
+static bool DEFAULT_IS_DICTIONARY_ENABLED = true;
+
+class WriterProperties {
+ public:
+  explicit WriterProperties(MemoryAllocator* allocator = default_allocator())
+      : allocator_(allocator) {
+    pagesize_ = DEFAULT_PAGE_SIZE;
+    dictionary_pagesize_ = DEFAULT_DICTIONARY_PAGE_SIZE;
+    dictionary_enabled_ = DEFAULT_IS_DICTIONARY_ENABLED;
+  }
+
+  int64_t dictionary_pagesize() const { return dictionary_pagesize_; }
+
+  void set_dictionary_pagesize(int64_t dictionary_psize) {
+    dictionary_pagesize_ = dictionary_psize;
+  }
+
+  int64_t data_pagesize() const { return pagesize_; }
+
+  void set_data_pagesize(int64_t pg_size) { pagesize_ = pg_size; }
+
+  void enable_dictionary() { dictionary_enabled_ = true; }
+
+  void disable_dictionary() { dictionary_enabled_ = false; }
+
+  bool is_dictionary_enabled() const { return dictionary_enabled_; }
+
+  MemoryAllocator* allocator() { return allocator_; }
+
+ private:
+  int64_t pagesize_;
+  int64_t dictionary_pagesize_;
+  bool dictionary_enabled_;
+  MemoryAllocator* allocator_;
+};
+
+WriterProperties default_writer_properties();
+
+}  // namespace parquet
+
+#endif  // PARQUET_COLUMN_PROPERTIES_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index a13dfd3..f79cb9d 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -22,12 +22,18 @@
 #include <memory>
 
 #include "parquet/column/page.h"
+#include "parquet/column/properties.h"
 
 #include "parquet/encodings/dictionary-encoding.h"
 #include "parquet/encodings/plain-encoding.h"
 
 namespace parquet {
 
+ReaderProperties default_reader_properties() {
+  static ReaderProperties default_reader_properties;
+  return default_reader_properties;
+}
+
 ColumnReader::ColumnReader(const ColumnDescriptor* descr,
     std::unique_ptr<PageReader> pager, MemoryAllocator* allocator)
     : descr_(descr),

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 584b8a7..0472adb 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -17,6 +17,7 @@
 
 #include "parquet/column/writer.h"
 
+#include "parquet/column/properties.h"
 #include "parquet/encodings/plain-encoding.h"
 
 namespace parquet {
@@ -24,6 +25,11 @@ namespace parquet {
 // ----------------------------------------------------------------------
 // ColumnWriter
 
+WriterProperties default_writer_properties() {
+  static WriterProperties default_writer_properties;
+  return default_writer_properties;
+}
+
 ColumnWriter::ColumnWriter(const ColumnDescriptor* descr,
     std::unique_ptr<PageWriter> pager, int64_t expected_rows, MemoryAllocator* allocator)
     : descr_(descr),

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 905c357..0aadc78 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -163,15 +163,12 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int
i) {
   }
 
   int64_t bytes_to_read = col.meta_data.total_compressed_size;
-  std::shared_ptr<Buffer> buffer = source_->ReadAt(col_start, bytes_to_read);
+  std::unique_ptr<InputStream> stream;
 
-  if (buffer->size() < bytes_to_read) {
-    throw ParquetException("Unable to read column chunk data");
-  }
+  stream = properties_.GetStream(source_, col_start, bytes_to_read);
 
-  std::unique_ptr<InputStream> stream(new InMemoryInputStream(buffer));
   return std::unique_ptr<PageReader>(new SerializedPageReader(
-      std::move(stream), FromThrift(col.meta_data.codec), allocator_));
+      std::move(stream), FromThrift(col.meta_data.codec), properties_.allocator()));
 }
 
 RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) {
@@ -194,9 +191,9 @@ static constexpr uint32_t FOOTER_SIZE = 8;
 static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
 
 std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
-    std::unique_ptr<RandomAccessSource> source, MemoryAllocator* allocator) {
+    std::unique_ptr<RandomAccessSource> source, ReaderProperties props) {
   std::unique_ptr<ParquetFileReader::Contents> result(
-      new SerializedFile(std::move(source), allocator));
+      new SerializedFile(std::move(source), props));
 
   // Access private methods here, but otherwise unavailable
   SerializedFile* file = static_cast<SerializedFile*>(result.get());
@@ -217,9 +214,10 @@ SerializedFile::~SerializedFile() {
 
 std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
   std::unique_ptr<SerializedRowGroup> contents(
-      new SerializedRowGroup(source_.get(), &metadata_.row_groups[i], allocator_));
+      new SerializedRowGroup(source_.get(), &metadata_.row_groups[i], properties_));
 
-  return std::make_shared<RowGroupReader>(&schema_, std::move(contents), allocator_);
+  return std::make_shared<RowGroupReader>(
+      &schema_, std::move(contents), properties_.allocator());
 }
 
 int64_t SerializedFile::num_rows() const {
@@ -235,8 +233,8 @@ int SerializedFile::num_row_groups() const {
 }
 
 SerializedFile::SerializedFile(std::unique_ptr<RandomAccessSource> source,
-    MemoryAllocator* allocator = default_allocator())
-    : source_(std::move(source)), allocator_(allocator) {}
+    ReaderProperties props = default_reader_properties())
+    : source_(std::move(source)), properties_(props) {}
 
 void SerializedFile::ParseMetaData() {
   int64_t filesize = source_->Size();
@@ -261,7 +259,7 @@ void SerializedFile::ParseMetaData() {
   }
   source_->Seek(metadata_start);
 
-  OwnedMutableBuffer metadata_buffer(metadata_len, allocator_);
+  OwnedMutableBuffer metadata_buffer(metadata_len, properties_.allocator());
   bytes_read = source_->Read(metadata_len, &metadata_buffer[0]);
   if (bytes_read != metadata_len) {
     throw ParquetException("Invalid parquet file. Could not read metadata bytes.");

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 797ff93..43d31a9 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include "parquet/column/page.h"
+#include "parquet/column/properties.h"
 #include "parquet/compression/codec.h"
 #include "parquet/file/reader.h"
 #include "parquet/thrift/parquet_types.h"
@@ -69,8 +70,8 @@ class SerializedPageReader : public PageReader {
 class SerializedRowGroup : public RowGroupReader::Contents {
  public:
   SerializedRowGroup(RandomAccessSource* source, const format::RowGroup* metadata,
-      MemoryAllocator* allocator)
-      : source_(source), metadata_(metadata), allocator_(allocator) {}
+      ReaderProperties props)
+      : source_(source), metadata_(metadata), properties_(props) {}
 
   virtual int num_columns() const;
   virtual int64_t num_rows() const;
@@ -80,7 +81,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
  private:
   RandomAccessSource* source_;
   const format::RowGroup* metadata_;
-  MemoryAllocator* allocator_;
+  ReaderProperties properties_;
 };
 
 // An implementation of ParquetFileReader::Contents that deals with the Parquet
@@ -94,7 +95,7 @@ class SerializedFile : public ParquetFileReader::Contents {
   // lifetime separately
   static std::unique_ptr<ParquetFileReader::Contents> Open(
       std::unique_ptr<RandomAccessSource> source,
-      MemoryAllocator* allocator = default_allocator());
+      ReaderProperties props = default_reader_properties());
   virtual void Close();
   virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i);
   virtual int64_t num_rows() const;
@@ -105,11 +106,11 @@ class SerializedFile : public ParquetFileReader::Contents {
  private:
   // This class takes ownership of the provided data source
   explicit SerializedFile(
-      std::unique_ptr<RandomAccessSource> source, MemoryAllocator* allocator);
+      std::unique_ptr<RandomAccessSource> source, ReaderProperties props);
 
   std::unique_ptr<RandomAccessSource> source_;
   format::FileMetaData metadata_;
-  MemoryAllocator* allocator_;
+  ReaderProperties properties_;
 
   void ParseMetaData();
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 232dbe4..cf65f29 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -73,8 +73,8 @@ ParquetFileReader::~ParquetFileReader() {
 }
 
 std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
-    std::unique_ptr<RandomAccessSource> source, MemoryAllocator* allocator) {
-  auto contents = SerializedFile::Open(std::move(source), allocator);
+    std::unique_ptr<RandomAccessSource> source, ReaderProperties props) {
+  auto contents = SerializedFile::Open(std::move(source), props);
 
   std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
   result->Open(std::move(contents));
@@ -83,16 +83,16 @@ std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
 }
 
 std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
-    const std::string& path, bool memory_map, MemoryAllocator* allocator) {
+    const std::string& path, bool memory_map, ReaderProperties props) {
   std::unique_ptr<LocalFileSource> file;
   if (memory_map) {
-    file.reset(new MemoryMapSource(allocator));
+    file.reset(new MemoryMapSource(props.allocator()));
   } else {
-    file.reset(new LocalFileSource(allocator));
+    file.reset(new LocalFileSource(props.allocator()));
   }
   file->Open(path);
 
-  return Open(std::move(file), allocator);
+  return Open(std::move(file), props);
 }
 
 void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents)
{
@@ -165,10 +165,15 @@ void ParquetFileReader::DebugPrint(
     for (auto i : selected_columns) {
       RowGroupStatistics stats = group_reader->GetColumnStats(i);
 
+      const ColumnDescriptor* descr = schema_->Column(i);
       stream << "Column " << i << ": " << group_reader->num_rows()
<< " rows, "
              << stats.num_values << " values, " << stats.null_count <<
" null values, "
-             << stats.distinct_count << " distinct values, " << *stats.max
<< " max, "
-             << *stats.min << " min, " << std::endl;
+             << stats.distinct_count << " distinct values, "
+             << FormatValue(
+                    descr->physical_type(), stats.max->c_str(), descr->type_length())
+             << " max, " << FormatValue(descr->physical_type(), stats.min->c_str(),
+                                descr->type_length())
+             << " min, " << std::endl;
     }
 
     if (!print_values) { continue; }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 565bb5c..2d2721f 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -25,6 +25,7 @@
 #include <string>
 
 #include "parquet/column/page.h"
+#include "parquet/column/properties.h"
 #include "parquet/schema/descriptor.h"
 
 namespace parquet {
@@ -97,11 +98,11 @@ class ParquetFileReader {
 
   // API Convenience to open a serialized Parquet file on disk
   static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path,
-      bool memory_map = true, MemoryAllocator* allocator = default_allocator());
+      bool memory_map = true, ReaderProperties props = default_reader_properties());
 
   static std::unique_ptr<ParquetFileReader> Open(
       std::unique_ptr<RandomAccessSource> source,
-      MemoryAllocator* allocator = default_allocator());
+      ReaderProperties props = default_reader_properties());
 
   void Open(std::unique_ptr<Contents> contents);
   void Close();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/types-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/types-test.cc b/src/parquet/types-test.cc
index f1f829d..b31e39d 100644
--- a/src/parquet/types-test.cc
+++ b/src/parquet/types-test.cc
@@ -60,4 +60,67 @@ TEST(TestLogicalTypeToString, LogicalTypes) {
   ASSERT_STREQ("INTERVAL", logical_type_to_string(LogicalType::INTERVAL).c_str());
 }
 
+TEST(TypePrinter, PhysicalTypes) {
+  std::string smin;
+  std::string smax;
+  int32_t int_min = 1024;
+  int32_t int_max = 2048;
+  smin = std::string(reinterpret_cast<char*>(&int_min), sizeof(int32_t));
+  smax = std::string(reinterpret_cast<char*>(&int_max), sizeof(int32_t));
+  ASSERT_STREQ("1024", FormatValue(Type::INT32, smin.c_str(), 0).c_str());
+  ASSERT_STREQ("2048", FormatValue(Type::INT32, smax.c_str(), 0).c_str());
+
+  int64_t int64_min = 10240000000000;
+  int64_t int64_max = 20480000000000;
+  smin = std::string(reinterpret_cast<char*>(&int64_min), sizeof(int64_t));
+  smax = std::string(reinterpret_cast<char*>(&int64_max), sizeof(int64_t));
+  ASSERT_STREQ("10240000000000", FormatValue(Type::INT64, smin.c_str(), 0).c_str());
+  ASSERT_STREQ("20480000000000", FormatValue(Type::INT64, smax.c_str(), 0).c_str());
+
+  float float_min = 1.024;
+  float float_max = 2.048;
+  smin = std::string(reinterpret_cast<char*>(&float_min), sizeof(float));
+  smax = std::string(reinterpret_cast<char*>(&float_max), sizeof(float));
+  ASSERT_STREQ("1.024", FormatValue(Type::FLOAT, smin.c_str(), 0).c_str());
+  ASSERT_STREQ("2.048", FormatValue(Type::FLOAT, smax.c_str(), 0).c_str());
+
+  double double_min = 1.0245;
+  double double_max = 2.0489;
+  smin = std::string(reinterpret_cast<char*>(&double_min), sizeof(double));
+  smax = std::string(reinterpret_cast<char*>(&double_max), sizeof(double));
+  ASSERT_STREQ("1.0245", FormatValue(Type::DOUBLE, smin.c_str(), 0).c_str());
+  ASSERT_STREQ("2.0489", FormatValue(Type::DOUBLE, smax.c_str(), 0).c_str());
+
+  Int96 Int96_min = {{1024, 2048, 4096}};
+  Int96 Int96_max = {{2048, 4096, 8192}};
+  smin = std::string(reinterpret_cast<char*>(&Int96_min), sizeof(Int96));
+  smax = std::string(reinterpret_cast<char*>(&Int96_max), sizeof(Int96));
+  ASSERT_STREQ("1024 2048 4096 ", FormatValue(Type::INT96, smin.c_str(), 0).c_str());
+  ASSERT_STREQ("2048 4096 8192 ", FormatValue(Type::INT96, smax.c_str(), 0).c_str());
+
+  ByteArray BA_min;
+  ByteArray BA_max;
+  BA_min.ptr = reinterpret_cast<const uint8_t*>("abcdef");
+  BA_min.len = 6;
+  BA_max.ptr = reinterpret_cast<const uint8_t*>("ijklmnop");
+  BA_max.len = 8;
+  smin = std::string(reinterpret_cast<char*>(&BA_min), sizeof(ByteArray));
+  smax = std::string(reinterpret_cast<char*>(&BA_max), sizeof(ByteArray));
+  ASSERT_STREQ("a b c d e f ", FormatValue(Type::BYTE_ARRAY, smin.c_str(), 0).c_str());
+  ASSERT_STREQ(
+      "i j k l m n o p ", FormatValue(Type::BYTE_ARRAY, smax.c_str(), 0).c_str());
+
+  FLBA FLBA_min;
+  FLBA FLBA_max;
+  FLBA_min.ptr = reinterpret_cast<const uint8_t*>("abcdefgh");
+  FLBA_max.ptr = reinterpret_cast<const uint8_t*>("ijklmnop");
+  int len = 8;
+  smin = std::string(reinterpret_cast<char*>(&FLBA_min), sizeof(FLBA));
+  smax = std::string(reinterpret_cast<char*>(&FLBA_max), sizeof(FLBA));
+  ASSERT_STREQ("a b c d e f g h ",
+      FormatValue(Type::FIXED_LEN_BYTE_ARRAY, smin.c_str(), len).c_str());
+  ASSERT_STREQ("i j k l m n o p ",
+      FormatValue(Type::FIXED_LEN_BYTE_ARRAY, smax.c_str(), len).c_str());
+}
+
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/types.cc
----------------------------------------------------------------------
diff --git a/src/parquet/types.cc b/src/parquet/types.cc
new file mode 100644
index 0000000..2b5d6b3
--- /dev/null
+++ b/src/parquet/types.cc
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstring>
+#include <sstream>
+#include <string>
+
+#include "parquet/types.h"
+
+namespace parquet {
+
+std::string FormatValue(Type::type parquet_type, const char* val, int length) {
+  std::stringstream result;
+  switch (parquet_type) {
+    case Type::BOOLEAN:
+      result << reinterpret_cast<const bool*>(val)[0];
+      break;
+    case Type::INT32:
+      result << reinterpret_cast<const int32_t*>(val)[0];
+      break;
+    case Type::INT64:
+      result << reinterpret_cast<const int64_t*>(val)[0];
+      break;
+    case Type::DOUBLE:
+      result << reinterpret_cast<const double*>(val)[0];
+      break;
+    case Type::FLOAT:
+      result << reinterpret_cast<const float*>(val)[0];
+      break;
+    case Type::INT96: {
+      for (int i = 0; i < 3; i++) {
+        result << reinterpret_cast<const int32_t*>(val)[i] << " ";
+      }
+      break;
+    }
+    case Type::BYTE_ARRAY: {
+      const ByteArray* a = reinterpret_cast<const ByteArray*>(val);
+      for (int i = 0; i < static_cast<int>(a->len); i++) {
+        result << a[0].ptr[i] << " ";
+      }
+      break;
+    }
+    case Type::FIXED_LEN_BYTE_ARRAY: {
+      const FLBA* a = reinterpret_cast<const FLBA*>(val);
+      for (int i = 0; i < length; i++) {
+        result << a[0].ptr[i] << " ";
+      }
+      break;
+    }
+    default:
+      break;
+  }
+  return result.str();
+}
+
+std::string type_to_string(Type::type t) {
+  switch (t) {
+    case Type::BOOLEAN:
+      return "BOOLEAN";
+      break;
+    case Type::INT32:
+      return "INT32";
+      break;
+    case Type::INT64:
+      return "INT64";
+      break;
+    case Type::INT96:
+      return "INT96";
+      break;
+    case Type::FLOAT:
+      return "FLOAT";
+      break;
+    case Type::DOUBLE:
+      return "DOUBLE";
+      break;
+    case Type::BYTE_ARRAY:
+      return "BYTE_ARRAY";
+      break;
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return "FIXED_LEN_BYTE_ARRAY";
+      break;
+    default:
+      return "UNKNOWN";
+      break;
+  }
+}
+
+std::string logical_type_to_string(LogicalType::type t) {
+  switch (t) {
+    case LogicalType::NONE:
+      return "NONE";
+      break;
+    case LogicalType::UTF8:
+      return "UTF8";
+      break;
+    case LogicalType::MAP_KEY_VALUE:
+      return "MAP_KEY_VALUE";
+      break;
+    case LogicalType::LIST:
+      return "LIST";
+      break;
+    case LogicalType::ENUM:
+      return "ENUM";
+      break;
+    case LogicalType::DECIMAL:
+      return "DECIMAL";
+      break;
+    case LogicalType::DATE:
+      return "DATE";
+      break;
+    case LogicalType::TIME_MILLIS:
+      return "TIME_MILLIS";
+      break;
+    case LogicalType::TIMESTAMP_MILLIS:
+      return "TIMESTAMP_MILLIS";
+      break;
+    case LogicalType::UINT_8:
+      return "UINT_8";
+      break;
+    case LogicalType::UINT_16:
+      return "UINT_16";
+      break;
+    case LogicalType::UINT_32:
+      return "UINT_32";
+      break;
+    case LogicalType::UINT_64:
+      return "UINT_64";
+      break;
+    case LogicalType::INT_8:
+      return "INT_8";
+      break;
+    case LogicalType::INT_16:
+      return "INT_16";
+      break;
+    case LogicalType::INT_32:
+      return "INT_32";
+      break;
+    case LogicalType::INT_64:
+      return "INT_64";
+      break;
+    case LogicalType::JSON:
+      return "JSON";
+      break;
+    case LogicalType::BSON:
+      return "BSON";
+      break;
+    case LogicalType::INTERVAL:
+      return "INTERVAL";
+      break;
+    default:
+      return "UNKNOWN";
+      break;
+  }
+}
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/types.h b/src/parquet/types.h
index a2d3622..ecd082d 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -266,105 +266,11 @@ inline std::string format_fwf(int width) {
   return ss.str();
 }
 
-static inline std::string type_to_string(Type::type t) {
-  switch (t) {
-    case Type::BOOLEAN:
-      return "BOOLEAN";
-      break;
-    case Type::INT32:
-      return "INT32";
-      break;
-    case Type::INT64:
-      return "INT64";
-      break;
-    case Type::INT96:
-      return "INT96";
-      break;
-    case Type::FLOAT:
-      return "FLOAT";
-      break;
-    case Type::DOUBLE:
-      return "DOUBLE";
-      break;
-    case Type::BYTE_ARRAY:
-      return "BYTE_ARRAY";
-      break;
-    case Type::FIXED_LEN_BYTE_ARRAY:
-      return "FIXED_LEN_BYTE_ARRAY";
-      break;
-    default:
-      return "UNKNOWN";
-      break;
-  }
-}
+std::string logical_type_to_string(LogicalType::type t);
 
-static inline std::string logical_type_to_string(LogicalType::type t) {
-  switch (t) {
-    case LogicalType::NONE:
-      return "NONE";
-      break;
-    case LogicalType::UTF8:
-      return "UTF8";
-      break;
-    case LogicalType::MAP_KEY_VALUE:
-      return "MAP_KEY_VALUE";
-      break;
-    case LogicalType::LIST:
-      return "LIST";
-      break;
-    case LogicalType::ENUM:
-      return "ENUM";
-      break;
-    case LogicalType::DECIMAL:
-      return "DECIMAL";
-      break;
-    case LogicalType::DATE:
-      return "DATE";
-      break;
-    case LogicalType::TIME_MILLIS:
-      return "TIME_MILLIS";
-      break;
-    case LogicalType::TIMESTAMP_MILLIS:
-      return "TIMESTAMP_MILLIS";
-      break;
-    case LogicalType::UINT_8:
-      return "UINT_8";
-      break;
-    case LogicalType::UINT_16:
-      return "UINT_16";
-      break;
-    case LogicalType::UINT_32:
-      return "UINT_32";
-      break;
-    case LogicalType::UINT_64:
-      return "UINT_64";
-      break;
-    case LogicalType::INT_8:
-      return "INT_8";
-      break;
-    case LogicalType::INT_16:
-      return "INT_16";
-      break;
-    case LogicalType::INT_32:
-      return "INT_32";
-      break;
-    case LogicalType::INT_64:
-      return "INT_64";
-      break;
-    case LogicalType::JSON:
-      return "JSON";
-      break;
-    case LogicalType::BSON:
-      return "BSON";
-      break;
-    case LogicalType::INTERVAL:
-      return "INTERVAL";
-      break;
-    default:
-      return "UNKNOWN";
-      break;
-  }
-}
+std::string type_to_string(Type::type t);
+
+std::string FormatValue(Type::type parquet_type, const char* val, int length);
 }  // namespace parquet
 
 #endif  // PARQUET_TYPES_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/util/input-output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc
index 9db2bdd..a98ae08 100644
--- a/src/parquet/util/input-output-test.cc
+++ b/src/parquet/util/input-output-test.cc
@@ -34,6 +34,67 @@
 
 namespace parquet {
 
+TEST(TestBufferedInputStream, Basics) {
+  int64_t source_size = 256;
+  int64_t stream_offset = 10;
+  int64_t stream_size = source_size - stream_offset;
+  int64_t chunk_size = 50;
+  auto buf = std::make_shared<OwnedMutableBuffer>(source_size);
+  ASSERT_EQ(source_size, buf->size());
+  for (int i = 0; i < source_size; i++) {
+    buf->mutable_data()[i] = i;
+  }
+
+  std::unique_ptr<BufferReader> source(new BufferReader(buf));
+  std::unique_ptr<MemoryAllocator> allocator(new TrackingAllocator());
+  std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream(
+      allocator.get(), chunk_size, source.get(), stream_offset, stream_size));
+
+  const uint8_t* output;
+  int64_t bytes_read;
+
+  // source is at offset 10
+  output = stream->Peek(10, &bytes_read);
+  ASSERT_EQ(10, bytes_read);
+  for (int i = 0; i < 10; i++) {
+    ASSERT_EQ(10 + i, output[i]) << i;
+  }
+  output = stream->Read(10, &bytes_read);
+  ASSERT_EQ(10, bytes_read);
+  for (int i = 0; i < 10; i++) {
+    ASSERT_EQ(10 + i, output[i]) << i;
+  }
+  output = stream->Read(10, &bytes_read);
+  ASSERT_EQ(10, bytes_read);
+  for (int i = 0; i < 10; i++) {
+    ASSERT_EQ(20 + i, output[i]) << i;
+  }
+  stream->Advance(5);
+  stream->Advance(5);
+  // source is at offset 40
+  // read across buffer boundary. buffer size is 50
+  output = stream->Read(20, &bytes_read);
+  ASSERT_EQ(20, bytes_read);
+  for (int i = 0; i < 20; i++) {
+    ASSERT_EQ(40 + i, output[i]) << i;
+  }
+  // read more than original chunk_size
+  output = stream->Read(60, &bytes_read);
+  ASSERT_EQ(60, bytes_read);
+  for (int i = 0; i < 60; i++) {
+    ASSERT_EQ(60 + i, output[i]) << i;
+  }
+
+  stream->Advance(120);
+  // source is at offset 240
+  // read outside of source boundary. source size is 256
+  output = stream->Read(30, &bytes_read);
+  ASSERT_EQ(16, bytes_read);
+  for (int i = 0; i < 16; i++) {
+    ASSERT_EQ(240 + i, output[i]) << i;
+  }
+}
+
 TEST(TestInMemoryOutputStream, Basics) {
   std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
index 8eb3956..73ca8a5 100644
--- a/src/parquet/util/input.cc
+++ b/src/parquet/util/input.cc
@@ -209,6 +209,16 @@ InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>&
buffer)
   len_ = buffer_->size();
 }
 
+InMemoryInputStream::InMemoryInputStream(
+    RandomAccessSource* source, int64_t start, int64_t num_bytes)
+    : offset_(0) {
+  buffer_ = source->ReadAt(start, num_bytes);
+  if (buffer_->size() < num_bytes) {
+    throw ParquetException("Unable to read column chunk data");
+  }
+  len_ = buffer_->size();
+}
+
 const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
   *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
   return buffer_->data() + offset_;
@@ -224,4 +234,48 @@ void InMemoryInputStream::Advance(int64_t num_bytes) {
   offset_ += num_bytes;
 }
 
+// ----------------------------------------------------------------------
+// BufferedInputStream
+BufferedInputStream::BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
+    RandomAccessSource* source, int64_t start, int64_t num_bytes)
+    : source_(source), stream_offset_(start), stream_end_(start + num_bytes) {
+  buffer_ = std::make_shared<OwnedMutableBuffer>(buffer_size, pool);
+  buffer_size_ = buffer_->size();
+  // Required to force a lazy read
+  buffer_offset_ = buffer_size_;
+}
+
+const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
+  *num_bytes = std::min(num_to_peek, stream_end_ - stream_offset_);
+  // increase the buffer size if needed
+  if (*num_bytes > buffer_size_) {
+    buffer_->Resize(*num_bytes);
+    buffer_size_ = buffer_->size();
+    DCHECK(buffer_size_ >= *num_bytes);
+  }
+  // Read more data when buffer has insufficient left or when resized
+  if (*num_bytes > (buffer_size_ - buffer_offset_)) {
+    source_->Seek(stream_offset_);
+    buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_);
+    int64_t bytes_read = source_->Read(buffer_size_, buffer_->mutable_data());
+    if (bytes_read < *num_bytes) {
+      throw ParquetException("Failed reading column data from source");
+    }
+    buffer_offset_ = 0;
+  }
+  return buffer_->data() + buffer_offset_;
+}
+
+const uint8_t* BufferedInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
+  const uint8_t* result = Peek(num_to_read, num_bytes);
+  stream_offset_ += *num_bytes;
+  buffer_offset_ += *num_bytes;
+  return result;
+}
+
+void BufferedInputStream::Advance(int64_t num_bytes) {
+  stream_offset_ += num_bytes;
+  buffer_offset_ += num_bytes;
+}
+
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/8a6eca9d/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
index 04bbb34..85b78cd 100644
--- a/src/parquet/util/input.h
+++ b/src/parquet/util/input.h
@@ -29,6 +29,7 @@
 namespace parquet {
 
 class Buffer;
+class OwnedMutableBuffer;
 
 // ----------------------------------------------------------------------
 // Random access input (e.g. file-like)
@@ -168,6 +169,7 @@ class InputStream {
 // Implementation of an InputStream when all the bytes are in memory.
 class InMemoryInputStream : public InputStream {
  public:
+  InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end);
   explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer);
   virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
   virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
@@ -180,6 +182,25 @@ class InMemoryInputStream : public InputStream {
   int64_t offset_;
 };
 
+// Implementation of an InputStream when only some of the bytes are in memory.
+class BufferedInputStream : public InputStream {
+ public:
+  BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
+      RandomAccessSource* source, int64_t start, int64_t end);
+  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
+  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
+
+  virtual void Advance(int64_t num_bytes);
+
+ private:
+  std::shared_ptr<OwnedMutableBuffer> buffer_;
+  RandomAccessSource* source_;
+  int64_t stream_offset_;
+  int64_t stream_end_;
+  int64_t buffer_offset_;
+  int64_t buffer_size_;
+};
+
 }  // namespace parquet
 
 #endif  // PARQUET_UTIL_INPUT_H


Mime
View raw message