parquet-commits mailing list archives

Site index · List index
Message view « Date » · « Thread »
Top « Date » · « Thread »
From jul...@apache.org
Subject parquet-cpp git commit: PARQUET-497: Decouple serialized file internals from the ParquetFileReader public API
Date Thu, 11 Feb 2016 08:21:36 GMT
Repository: parquet-cpp
Updated Branches:
  refs/heads/master c11e7d487 -> d51a1dcdd


PARQUET-497: Decouple serialized file internals from the ParquetFileReader public API

This depends on PARQUET-501. A bit of a refactoring bloodbath, but extremely important to
split out these details so that we can instrument the file reader public APIs with test fixtures
for unit testing purposes.

Author: Wes McKinney <wes@cloudera.com>

Closes #47 from wesm/PARQUET-497 and squashes the following commits:

aa152ad [Wes McKinney] Decouple Parquet file format details and Thrift metadata from the ParquetFileReader
and RowGroupReader public APIs.


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/d51a1dcd
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/d51a1dcd
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/d51a1dcd

Branch: refs/heads/master
Commit: d51a1dcddc9b42b83333581d68607c6299d37aa8
Parents: c11e7d4
Author: Wes McKinney <wes@cloudera.com>
Authored: Thu Feb 11 00:21:32 2016 -0800
Committer: Julien Le Dem <julien@dremio.com>
Committed: Thu Feb 11 00:21:32 2016 -0800

----------------------------------------------------------------------
 CMakeLists.txt                      |   5 +-
 example/parquet-dump-schema.cc      |  25 +--
 example/parquet_reader.cc           |  18 +--
 src/parquet/CMakeLists.txt          |   1 -
 src/parquet/file/CMakeLists.txt     |  20 +++
 src/parquet/file/reader-internal.cc | 166 ++++++++++++++++++++
 src/parquet/file/reader-internal.h  |  79 ++++++++++
 src/parquet/file/reader.cc          | 207 ++++++++++++++++++++++++
 src/parquet/file/reader.h           | 139 ++++++++++++++++
 src/parquet/parquet.h               |   2 +-
 src/parquet/reader-test.cc          |  21 ++-
 src/parquet/reader.cc               | 262 -------------------------------
 src/parquet/reader.h                | 118 --------------
 src/parquet/schema/descriptor.h     |   4 +
 14 files changed, 638 insertions(+), 429 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 452b315..d262375 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -293,7 +293,9 @@ set(LIBPARQUET_SRCS
   src/parquet/column/serialized-page.cc
   src/parquet/column/reader.cc
   src/parquet/column/scanner.cc
-  src/parquet/reader.cc
+
+  src/parquet/file/reader.cc
+  src/parquet/file/reader-internal.cc
 
   src/parquet/schema/converter.cc
   src/parquet/schema/descriptor.cc
@@ -331,6 +333,7 @@ add_subdirectory(src/parquet)
 add_subdirectory(src/parquet/column)
 add_subdirectory(src/parquet/compression)
 add_subdirectory(src/parquet/encodings)
+add_subdirectory(src/parquet/file)
 add_subdirectory(src/parquet/schema)
 add_subdirectory(src/parquet/thrift)
 add_subdirectory(src/parquet/util)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/example/parquet-dump-schema.cc
----------------------------------------------------------------------
diff --git a/example/parquet-dump-schema.cc b/example/parquet-dump-schema.cc
index 09c715c..d8eb2ee 100644
--- a/example/parquet-dump-schema.cc
+++ b/example/parquet-dump-schema.cc
@@ -15,7 +15,7 @@
 // specific language governing permissions and limitations
 // under the License.
 
-#include <parquet/reader.h>
+#include <parquet/parquet.h>
 #include <parquet/schema/converter.h>
 #include <parquet/schema/printer.h>
 
@@ -23,31 +23,12 @@
 
 using namespace parquet_cpp;
 
-void DumpSchema(const ParquetFileReader* reader) {
-  auto schema = reader->metadata().schema;
-  schema::FlatSchemaConverter converter(&schema[0], schema.size());
-  std::unique_ptr<schema::Node> root = converter.Convert();
-
-  PrintSchema(root.get(), std::cout);
-}
-
 int main(int argc, char** argv) {
   std::string filename = argv[1];
 
-  parquet_cpp::ParquetFileReader reader;
-  parquet_cpp::LocalFileSource file;
-
-  file.Open(filename);
-  if (!file.is_open()) {
-    std::cerr << "Could not open file " << file.path()
-              << std::endl;
-    return -1;
-  }
-
   try {
-    reader.Open(&file);
-    reader.ParseMetaData();
-    DumpSchema(&reader);
+    std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename);
+    PrintSchema(reader->descr()->schema().get(), std::cout);
   } catch (const std::exception& e) {
     std::cerr << "Parquet error: "
               << e.what()

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index ca717df..8e498c7 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -18,6 +18,9 @@
 #include <parquet/parquet.h>
 
 #include <iostream>
+#include <memory>
+
+using namespace parquet_cpp;
 
 int main(int argc, char** argv) {
   if (argc > 3) {
@@ -39,20 +42,9 @@ int main(int argc, char** argv) {
     }
   }
 
-  parquet_cpp::ParquetFileReader reader;
-  parquet_cpp::LocalFileSource file;
-
-  file.Open(filename);
-  if (!file.is_open()) {
-    std::cerr << "Could not open file " << file.path()
-              << std::endl;
-    return -1;
-  }
-
   try {
-    reader.Open(&file);
-    reader.ParseMetaData();
-    reader.DebugPrint(std::cout, print_values);
+    std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename);
+    reader->DebugPrint(std::cout, print_values);
   } catch (const std::exception& e) {
     std::cerr << "Parquet error: "
               << e.what()

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index 2d69ba0..6a47917 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -18,7 +18,6 @@
 # Headers: top level
 install(FILES
   parquet.h
-  reader.h
   exception.h
   types.h
   DESTINATION include/parquet)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt
new file mode 100644
index 0000000..ef6ac01
--- /dev/null
+++ b/src/parquet/file/CMakeLists.txt
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+install(FILES
+  reader.h
+  DESTINATION include/parquet/file)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
new file mode 100644
index 0000000..7b0a719
--- /dev/null
+++ b/src/parquet/file/reader-internal.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file/reader-internal.h"
+
+#include <memory>
+#include <vector>
+
+#include "parquet/column/serialized-page.h"
+#include "parquet/schema/converter.h"
+#include "parquet/thrift/util.h"
+#include "parquet/util/input.h"
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// SerializedRowGroup
+
+int SerializedRowGroup::num_columns() const {
+  return metadata_->columns.size();
+}
+
+std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
+  // Read column chunk from the file
+  const parquet::ColumnChunk& col = metadata_->columns[i];
+
+  int64_t col_start = col.meta_data.data_page_offset;
+  if (col.meta_data.__isset.dictionary_page_offset &&
+      col_start > col.meta_data.dictionary_page_offset) {
+    col_start = col.meta_data.dictionary_page_offset;
+  }
+
+  // TODO(wesm): some input streams (e.g. memory maps) may not require
+  // copying data. This should be added to the input stream API to support
+  // zero-copy streaming
+  std::unique_ptr<InputStream> input(
+      new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
+
+  source_->Seek(col_start);
+  ScopedInMemoryInputStream* scoped_input =
+    static_cast<ScopedInMemoryInputStream*>(input.get());
+  size_t bytes_read = source_->Read(scoped_input->size(), scoped_input->data());
+
+  if (bytes_read != scoped_input->size()) {
+    throw ParquetException("Unable to read column chunk data");
+  }
+
+  const ColumnDescriptor* descr = schema_->Column(i);
+
+  return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(input),
+          col.meta_data.codec));
+}
+
+RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) {
+  const parquet::ColumnMetaData& meta_data = metadata_->columns[i].meta_data;
+
+  RowGroupStatistics result;
+  result.num_values = meta_data.num_values;
+  result.null_count = meta_data.statistics.null_count;
+  result.distinct_count = meta_data.statistics.distinct_count;
+
+  return result;
+}
+
+// ----------------------------------------------------------------------
+// SerializedFile: Parquet on-disk layout
+
+static constexpr uint32_t FOOTER_SIZE = 8;
+static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+
+std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
+    std::unique_ptr<RandomAccessSource> source) {
+  std::unique_ptr<ParquetFileReader::Contents> result(
+      new SerializedFile(std::move(source)));
+
+  // Access private methods here, but otherwise unavailable
+  SerializedFile* file = static_cast<SerializedFile*>(result.get());
+
+  // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
+  file->ParseMetaData();
+
+  return result;
+}
+
+void SerializedFile::Close() {
+  source_->Close();
+}
+
+std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
+  std::unique_ptr<SerializedRowGroup> contents(new SerializedRowGroup(source_.get(),
+          &schema_, &metadata_.row_groups[i]));
+
+  return std::make_shared<RowGroupReader>(&schema_, std::move(contents));
+}
+
+int64_t SerializedFile::num_rows() const {
+  return metadata_.num_rows;
+}
+
+int SerializedFile::num_columns() const {
+  return schema_.num_columns();
+}
+
+int SerializedFile::num_row_groups() const {
+  return metadata_.row_groups.size();
+}
+
+SerializedFile::SerializedFile(std::unique_ptr<RandomAccessSource> source) :
+    source_(std::move(source)) {}
+
+
+void SerializedFile::ParseMetaData() {
+  size_t filesize = source_->Size();
+
+  if (filesize < FOOTER_SIZE) {
+    throw ParquetException("Corrupted file, smaller than file footer");
+  }
+
+  uint8_t footer_buffer[FOOTER_SIZE];
+  source_->Seek(filesize - FOOTER_SIZE);
+  size_t bytes_read = source_->Read(FOOTER_SIZE, footer_buffer);
+
+  if (bytes_read != FOOTER_SIZE) {
+    throw ParquetException("Invalid parquet file. Corrupt footer.");
+  }
+  if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
+    throw ParquetException("Invalid parquet file. Corrupt footer.");
+  }
+
+  uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer);
+  size_t metadata_start = filesize - FOOTER_SIZE - metadata_len;
+  if (FOOTER_SIZE + metadata_len > filesize) {
+    throw ParquetException("Invalid parquet file. File is less than "
+        "file metadata size.");
+  }
+
+  source_->Seek(metadata_start);
+
+  std::vector<uint8_t> metadata_buffer(metadata_len);
+  bytes_read = source_->Read(metadata_len, &metadata_buffer[0]);
+  if (bytes_read != metadata_len) {
+    throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
+  }
+
+  DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_);
+
+  schema::FlatSchemaConverter converter(&metadata_.schema[0],
+      metadata_.schema.size());
+  schema_.Init(converter.Convert());
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
new file mode 100644
index 0000000..8ba105e
--- /dev/null
+++ b/src/parquet/file/reader-internal.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_FILE_READER_INTERNAL_H
+#define PARQUET_FILE_READER_INTERNAL_H
+
+#include "parquet/file/reader.h"
+
+#include <memory>
+
+#include "parquet/schema/descriptor.h"
+#include "parquet/util/input.h"
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet_cpp {
+
+// RowGroupReader::Contents implementation for the Parquet file specification
+class SerializedRowGroup : public RowGroupReader::Contents {
+ public:
+  SerializedRowGroup(RandomAccessSource* source, const SchemaDescriptor* schema,
+      const parquet::RowGroup* metadata) :
+      source_(source),
+      schema_(schema),
+      metadata_(metadata) {}
+
+  virtual int num_columns() const;
+  virtual std::unique_ptr<PageReader> GetColumnPageReader(int i);
+  virtual RowGroupStatistics GetColumnStats(int i);
+
+ private:
+  RandomAccessSource* source_;
+  const SchemaDescriptor* schema_;
+  const parquet::RowGroup* metadata_;
+};
+
+// An implementation of ParquetFileReader::Contents that deals with the Parquet
+// file structure, Thrift deserialization, and other internal matters
+
+class SerializedFile : public ParquetFileReader::Contents {
+ public:
+  // Open the valid and validate the header, footer, and parse the Thrift metadata
+  //
+  // This class does _not_ take ownership of the data source. You must manage its
+  // lifetime separately
+  static std::unique_ptr<ParquetFileReader::Contents> Open(
+      std::unique_ptr<RandomAccessSource> source);
+  virtual void Close();
+  virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i);
+  virtual int64_t num_rows() const;
+  virtual int num_columns() const;
+  virtual int num_row_groups() const;
+
+ private:
+  // This class takes ownership of the provided data source
+  explicit SerializedFile(std::unique_ptr<RandomAccessSource> source);
+
+  std::unique_ptr<RandomAccessSource> source_;
+  parquet::FileMetaData metadata_;
+
+  void ParseMetaData();
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_FILE_READER_INTERNAL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
new file mode 100644
index 0000000..6ef59ed
--- /dev/null
+++ b/src/parquet/file/reader.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file/reader.h"
+
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "parquet/column/reader.h"
+#include "parquet/column/scanner.h"
+
+#include "parquet/exception.h"
+#include "parquet/file/reader-internal.h"
+
+using std::string;
+using std::vector;
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// RowGroupReader public API
+
+RowGroupReader::RowGroupReader(const SchemaDescriptor* schema,
+    std::unique_ptr<Contents> contents) :
+    schema_(schema),
+    contents_(std::move(contents)) {}
+
+int RowGroupReader::num_columns() const {
+  return contents_->num_columns();
+}
+
+std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
+  // TODO: boundschecking
+  auto it = column_readers_.find(i);
+  if (it !=  column_readers_.end()) {
+    // Already have constructed the ColumnReader
+    return it->second;
+  }
+
+  const ColumnDescriptor* descr = schema_->Column(i);
+
+  std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
+  std::shared_ptr<ColumnReader> reader = ColumnReader::Make(descr,
+      std::move(page_reader));
+  column_readers_[i] = reader;
+  return reader;
+}
+
+RowGroupStatistics RowGroupReader::GetColumnStats(int i) const {
+  return contents_->GetColumnStats(i);
+}
+
+// ----------------------------------------------------------------------
+// ParquetFileReader public API
+
+ParquetFileReader::ParquetFileReader() : schema_(nullptr) {}
+ParquetFileReader::~ParquetFileReader() {}
+
+std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string&
path) {
+  std::unique_ptr<LocalFileSource> file(new LocalFileSource());
+  file->Open(path);
+
+  auto contents = SerializedFile::Open(std::move(file));
+
+  std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
+  result->Open(std::move(contents));
+
+  return result;
+}
+
+void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents)
{
+  contents_ = std::move(contents);
+  schema_ = contents_->schema();
+}
+
+void ParquetFileReader::Close() {
+  contents_->Close();
+}
+
+int ParquetFileReader::num_row_groups() const {
+  return contents_->num_row_groups();
+}
+
+int64_t ParquetFileReader::num_rows() const {
+  return contents_->num_rows();
+}
+
+int ParquetFileReader::num_columns() const {
+  return schema_->num_columns();
+}
+
+RowGroupReader* ParquetFileReader::RowGroup(int i) {
+  if (i >= num_row_groups()) {
+    std::stringstream ss;
+    ss << "The file only has " << num_row_groups()
+       << "row groups, requested reader for: "
+       << i;
+    throw ParquetException(ss.str());
+  }
+
+  auto it = row_group_readers_.find(i);
+  if (it != row_group_readers_.end()) {
+    // Constructed the RowGroupReader already
+    return it->second.get();
+  }
+
+  row_group_readers_[i] = contents_->GetRowGroup(i);
+  return row_group_readers_[i].get();
+}
+
+// ----------------------------------------------------------------------
+// ParquetFileReader::DebugPrint
+
+// the fixed initial size is just for an example
+#define COL_WIDTH "20"
+
+
+void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
+  stream << "File statistics:\n";
+  stream << "Total rows: " << this->num_rows() << "\n";
+
+  for (int i = 0; i < num_columns(); ++i) {
+    const ColumnDescriptor* descr = schema_->Column(i);
+    stream << "Column " << i << ": "
+           << descr->name()
+           << " ("
+           << type_to_string(descr->physical_type())
+           << ")" << std::endl;
+  }
+
+  for (int r = 0; r < num_row_groups(); ++r) {
+    stream << "--- Row Group " << r << " ---\n";
+
+    RowGroupReader* group_reader = RowGroup(r);
+
+    // Print column metadata
+    size_t num_columns = group_reader->num_columns();
+
+    for (int i = 0; i < num_columns; ++i) {
+      RowGroupStatistics stats = group_reader->GetColumnStats(i);
+
+      stream << "Column " << i << ": "
+             << stats.num_values << " rows, "
+             << stats.null_count << " null values, "
+             << stats.distinct_count << " distinct values, "
+             << std::endl;
+    }
+
+    if (!print_values) {
+      continue;
+    }
+
+    static constexpr size_t bufsize = 25;
+    char buffer[bufsize];
+
+    // Create readers for all columns and print contents
+    vector<std::shared_ptr<Scanner> > scanners(num_columns, NULL);
+    for (int i = 0; i < num_columns; ++i) {
+      std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
+      Type::type col_type = col_reader->type();
+
+      std::stringstream ss;
+      ss << "%-" << COL_WIDTH << "s";
+      std::string fmt = ss.str();
+
+      snprintf(buffer, bufsize, fmt.c_str(), column_schema(i)->name().c_str());
+      stream << buffer;
+
+      // This is OK in this method as long as the RowGroupReader does not get
+      // deleted
+      scanners[i] = Scanner::Make(col_reader);
+    }
+    stream << "\n";
+
+    bool hasRow;
+    do {
+      hasRow = false;
+      for (int i = 0; i < num_columns; ++i) {
+        if (scanners[i]->HasNext()) {
+          hasRow = true;
+          scanners[i]->PrintNext(stream, 17);
+        }
+      }
+      stream << "\n";
+    } while (hasRow);
+  }
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
new file mode 100644
index 0000000..3ff8697
--- /dev/null
+++ b/src/parquet/file/reader.h
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_FILE_READER_H
+#define PARQUET_FILE_READER_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <stdio.h>
+#include <unordered_map>
+
+#include "parquet/types.h"
+#include "parquet/schema/descriptor.h"
+
+// TODO(wesm): Still depends on Thrift
+#include "parquet/column/page.h"
+
+namespace parquet_cpp {
+
+class ColumnReader;
+class ParquetFileReader;
+
+struct RowGroupStatistics {
+  int64_t num_values;
+  int64_t null_count;
+  int64_t distinct_count;
+};
+
+class RowGroupReader {
+ public:
+  // Forward declare the PIMPL
+  struct Contents {
+    virtual int num_columns() const = 0;
+    virtual RowGroupStatistics GetColumnStats(int i) = 0;
+    virtual std::unique_ptr<PageReader> GetColumnPageReader(int i) = 0;
+  };
+
+  RowGroupReader(const SchemaDescriptor* schema, std::unique_ptr<Contents> contents);
+
+  // Construct a ColumnReader for the indicated row group-relative
+  // column. Ownership is shared with the RowGroupReader.
+  std::shared_ptr<ColumnReader> Column(int i);
+  int num_columns() const;
+
+  RowGroupStatistics GetColumnStats(int i) const;
+
+ private:
+  // Owned by the parent ParquetFileReader
+  const SchemaDescriptor* schema_;
+
+  // PIMPL idiom
+  // This is declared in the .cc file so that we can hide compiled Thrift
+  // headers from the public API and also more easily create test fixtures.
+  std::unique_ptr<Contents> contents_;
+
+  // Column index -> ColumnReader
+  std::unordered_map<int, std::shared_ptr<ColumnReader> > column_readers_;
+};
+
+
+class ParquetFileReader {
+ public:
+  // Forward declare the PIMPL
+  struct Contents {
+    // Perform any cleanup associated with the file contents
+    virtual void Close() = 0;
+
+    virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i) = 0;
+
+    virtual int64_t num_rows() const = 0;
+    virtual int num_columns() const = 0;
+    virtual int num_row_groups() const = 0;
+
+    // Return const-poitner to make it clear that this object is not to be copied
+    const SchemaDescriptor* schema() const {
+      return &schema_;
+    }
+    SchemaDescriptor schema_;
+  };
+
+  ParquetFileReader();
+  ~ParquetFileReader();
+
+  // API Convenience to open a serialized Parquet file on disk
+  static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path);
+
+  void Open(std::unique_ptr<Contents> contents);
+  void Close();
+
+  // The RowGroupReader is owned by the FileReader
+  RowGroupReader* RowGroup(int i);
+
+  int num_columns() const;
+  int64_t num_rows() const;
+  int num_row_groups() const;
+
+  // Returns the file schema descriptor
+  const SchemaDescriptor* descr() {
+    return schema_;
+  }
+
+  const ColumnDescriptor* column_schema(int i) const {
+    return schema_->Column(i);
+  }
+
+  void DebugPrint(std::ostream& stream, bool print_values = true);
+
+ private:
+  // PIMPL idiom
+  // This is declared in the .cc file so that we can hide compiled Thrift
+  // headers from the public API and also more easily create test fixtures.
+  std::unique_ptr<Contents> contents_;
+
+  // The SchemaDescriptor is provided by the Contents impl
+  const SchemaDescriptor* schema_;
+
+  // Row group index -> RowGroupReader
+  std::unordered_map<int, std::shared_ptr<RowGroupReader> > row_group_readers_;
+};
+
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_FILE_READER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index 7030d0e..b8624ae 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -27,8 +27,8 @@
 #include <vector>
 
 #include "parquet/exception.h"
-#include "parquet/reader.h"
 #include "parquet/column/reader.h"
+#include "parquet/file/reader.h"
 
 #include "parquet/util/input.h"
 #include "parquet/util/output.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index 8da8b99..8599e7e 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -17,12 +17,13 @@
 
 #include <cstdlib>
 #include <iostream>
+#include <memory>
 #include <sstream>
 #include <string>
 
 #include <gtest/gtest.h>
 
-#include "parquet/reader.h"
+#include "parquet/file/reader.h"
 #include "parquet/column/reader.h"
 #include "parquet/column/scanner.h"
 #include "parquet/util/input.h"
@@ -41,24 +42,22 @@ class TestAllTypesPlain : public ::testing::Test {
 
     std::stringstream ss;
     ss << dir_string << "/" << "alltypes_plain.parquet";
-    file_.Open(ss.str());
-    reader_.Open(&file_);
+
+    reader_ = ParquetFileReader::OpenFile(ss.str());
   }
 
   void TearDown() {}
 
  protected:
-  LocalFileSource file_;
-  ParquetFileReader reader_;
+  std::unique_ptr<ParquetFileReader> reader_;
 };
 
 
-TEST_F(TestAllTypesPlain, ParseMetaData) {
-  reader_.ParseMetaData();
+TEST_F(TestAllTypesPlain, NoopConstructDestruct) {
 }
 
 TEST_F(TestAllTypesPlain, TestBatchRead) {
-  RowGroupReader* group = reader_.RowGroup(0);
+  RowGroupReader* group = reader_->RowGroup(0);
 
   // column 0, id
   std::shared_ptr<Int32Reader> col =
@@ -86,7 +85,7 @@ TEST_F(TestAllTypesPlain, TestBatchRead) {
 }
 
 TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
-  RowGroupReader* group = reader_.RowGroup(0);
+  RowGroupReader* group = reader_->RowGroup(0);
 
   // column 0, id
   std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
@@ -103,7 +102,7 @@ TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
 
 
 TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) {
-  RowGroupReader* group = reader_.RowGroup(0);
+  RowGroupReader* group = reader_->RowGroup(0);
 
   // column 0, id
   std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
@@ -118,7 +117,7 @@ TEST_F(TestAllTypesPlain, DebugPrintWorks) {
   std::stringstream ss;
 
   // Automatically parses metadata
-  reader_.DebugPrint(ss);
+  reader_->DebugPrint(ss);
 
   std::string result = ss.str();
   ASSERT_GT(result.size(), 0);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
deleted file mode 100644
index 3fcce90..0000000
--- a/src/parquet/reader.cc
+++ /dev/null
@@ -1,262 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/reader.h"
-
-#include <cstdio>
-#include <cstring>
-#include <memory>
-#include <sstream>
-#include <string>
-#include <vector>
-
-#include "parquet/column/reader.h"
-#include "parquet/column/serialized-page.h"
-#include "parquet/column/scanner.h"
-
-#include "parquet/exception.h"
-#include "parquet/schema/converter.h"
-#include "parquet/thrift/util.h"
-
-using std::string;
-using std::vector;
-
-namespace parquet_cpp {
-
-// ----------------------------------------------------------------------
-// RowGroupReader
-
-std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) {
-  // TODO: boundschecking
-  auto it = column_readers_.find(i);
-  if (it !=  column_readers_.end()) {
-    // Already have constructed the ColumnReader
-    return it->second;
-  }
-
-  const parquet::ColumnChunk& col = row_group_->columns[i];
-
-  size_t col_start = col.meta_data.data_page_offset;
-  if (col.meta_data.__isset.dictionary_page_offset &&
-      col_start > col.meta_data.dictionary_page_offset) {
-    col_start = col.meta_data.dictionary_page_offset;
-  }
-
-  std::unique_ptr<InputStream> input(
-      new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
-
-  RandomAccessSource* source = this->parent_->buffer_;
-
-  source->Seek(col_start);
-
-  // TODO(wesm): Law of demeter violation
-  ScopedInMemoryInputStream* scoped_input =
-    static_cast<ScopedInMemoryInputStream*>(input.get());
-  size_t bytes_read = source->Read(scoped_input->size(), scoped_input->data());
-  if (bytes_read != scoped_input->size()) {
-    std::cout << "Bytes needed: " << col.meta_data.total_compressed_size <<
std::endl;
-    std::cout << "Bytes read: " << bytes_read << std::endl;
-    throw ParquetException("Unable to read column chunk data");
-  }
-
-  const ColumnDescriptor* descr = parent_->column_descr(i);
-
-  std::unique_ptr<PageReader> pager(
-      new SerializedPageReader(std::move(input), col.meta_data.codec));
-
-  std::shared_ptr<ColumnReader> reader = ColumnReader::Make(descr,
-      std::move(pager));
-  column_readers_[i] = reader;
-
-  return reader;
-}
-
-// ----------------------------------------------------------------------
-// ParquetFileReader
-
-// 4 byte constant + 4 byte metadata len
-static constexpr uint32_t FOOTER_SIZE = 8;
-static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-ParquetFileReader::ParquetFileReader() :
-    parsed_metadata_(false),
-    buffer_(nullptr) {}
-
-ParquetFileReader::~ParquetFileReader() {}
-
-void ParquetFileReader::Open(RandomAccessSource* buffer) {
-  buffer_ = buffer;
-}
-
-void ParquetFileReader::Close() {
-  buffer_->Close();
-}
-
-RowGroupReader* ParquetFileReader::RowGroup(size_t i) {
-  if (!parsed_metadata_) {
-    ParseMetaData();
-  }
-
-  if (i >= num_row_groups()) {
-    std::stringstream ss;
-    ss << "The file only has " << num_row_groups()
-       << "row groups, requested reader for: "
-       << i;
-    throw ParquetException(ss.str());
-  }
-
-  auto it = row_group_readers_.find(i);
-  if (it != row_group_readers_.end()) {
-    // Constructed the RowGroupReader already
-    return it->second.get();
-  }
-  if (!parsed_metadata_) {
-    ParseMetaData();
-  }
-
-  // Construct the RowGroupReader
-  row_group_readers_[i] = std::make_shared<RowGroupReader>(this,
-      &metadata_.row_groups[i]);
-  return row_group_readers_[i].get();
-}
-
-void ParquetFileReader::ParseMetaData() {
-  size_t filesize = buffer_->Size();
-
-  if (filesize < FOOTER_SIZE) {
-    throw ParquetException("Corrupted file, smaller than file footer");
-  }
-
-  uint8_t footer_buffer[FOOTER_SIZE];
-
-  buffer_->Seek(filesize - FOOTER_SIZE);
-
-  size_t bytes_read = buffer_->Read(FOOTER_SIZE, footer_buffer);
-
-  if (bytes_read != FOOTER_SIZE) {
-    throw ParquetException("Invalid parquet file. Corrupt footer.");
-  }
-  if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
-    throw ParquetException("Invalid parquet file. Corrupt footer.");
-  }
-
-  uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer);
-  size_t metadata_start = filesize - FOOTER_SIZE - metadata_len;
-  if (FOOTER_SIZE + metadata_len > filesize) {
-    throw ParquetException("Invalid parquet file. File is less than file metadata size.");
-  }
-
-  buffer_->Seek(metadata_start);
-
-  std::vector<uint8_t> metadata_buffer(metadata_len);
-  bytes_read = buffer_->Read(metadata_len, &metadata_buffer[0]);
-  if (bytes_read != metadata_len) {
-    throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
-  }
-  DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_);
-
-  schema::FlatSchemaConverter converter(&metadata_.schema[0],
-      metadata_.schema.size());
-  schema_descr_.Init(converter.Convert());
-
-  parsed_metadata_ = true;
-}
-
-// ----------------------------------------------------------------------
-// ParquetFileReader::DebugPrint
-
-// the fixed initial size is just for an example
-#define COL_WIDTH "20"
-
-
-void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
-  if (!parsed_metadata_) {
-    ParseMetaData();
-  }
-
-  stream << "File statistics:\n";
-  stream << "Total rows: " << metadata_.num_rows << "\n";
-  for (int i = 0; i < num_columns(); ++i) {
-    const ColumnDescriptor* descr = column_descr(i);
-    stream << "Column " << i << ": "
-           << descr->name()
-           << " ("
-           << type_to_string(descr->physical_type())
-           << ")" << std::endl;
-  }
-
-  for (int r = 0; r < num_row_groups(); ++r) {
-    stream << "--- Row Group " << r << " ---\n";
-
-    RowGroupReader* group_reader = RowGroup(r);
-
-    // Print column metadata
-    size_t num_columns = group_reader->num_columns();
-
-    for (int i = 0; i < num_columns; ++i) {
-      const parquet::ColumnMetaData* meta_data = group_reader->column_metadata(i);
-      stream << "Column " << i << ": "
-             << meta_data->num_values << " rows, "
-             << meta_data->statistics.null_count << " null values, "
-             << meta_data->statistics.distinct_count << " distinct values,
"
-             << "min value: " << (meta_data->statistics.min.length() >
0 ?
-                 meta_data->statistics.min : "N/A")
-             << ", max value: " << (meta_data->statistics.max.length() >
0 ?
-                 meta_data->statistics.max : "N/A") << ".\n";
-    }
-
-    if (!print_values) {
-      continue;
-    }
-
-    static constexpr size_t bufsize = 25;
-    char buffer[bufsize];
-
-    // Create readers for all columns and print contents
-    vector<std::shared_ptr<Scanner> > scanners(num_columns, NULL);
-    for (int i = 0; i < num_columns; ++i) {
-      std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
-      Type::type col_type = col_reader->type();
-
-      std::stringstream ss;
-      ss << "%-" << COL_WIDTH << "s";
-      std::string fmt = ss.str();
-
-      snprintf(buffer, bufsize, fmt.c_str(), column_descr(i)->name().c_str());
-      stream << buffer;
-
-      // This is OK in this method as long as the RowGroupReader does not get
-      // deleted
-      scanners[i] = Scanner::Make(col_reader);
-    }
-    stream << "\n";
-
-    bool hasRow;
-    do {
-      hasRow = false;
-      for (int i = 0; i < num_columns; ++i) {
-        if (scanners[i]->HasNext()) {
-          hasRow = true;
-          scanners[i]->PrintNext(stream, 17);
-        }
-      }
-      stream << "\n";
-    } while (hasRow);
-  }
-}
-
-} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/reader.h b/src/parquet/reader.h
deleted file mode 100644
index 3a9dc5d..0000000
--- a/src/parquet/reader.h
+++ /dev/null
@@ -1,118 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_FILE_READER_H
-#define PARQUET_FILE_READER_H
-
-#include <cstdint>
-#include <memory>
-#include <string>
-#include <stdio.h>
-#include <unordered_map>
-
-#include "parquet/thrift/parquet_types.h"
-
-#include "parquet/types.h"
-#include "parquet/schema/descriptor.h"
-#include "parquet/util/input.h"
-
-namespace parquet_cpp {
-
-class ColumnReader;
-class ParquetFileReader;
-
-class RowGroupReader {
- public:
-  RowGroupReader(ParquetFileReader* parent, parquet::RowGroup* group) :
-      parent_(parent),
-      row_group_(group) {}
-
-  // Construct a ColumnReader for the indicated row group-relative
-  // column. Ownership is shared with the RowGroupReader.
-  std::shared_ptr<ColumnReader> Column(size_t i);
-
-  const parquet::ColumnMetaData* column_metadata(size_t i) const {
-    return &row_group_->columns[i].meta_data;
-  }
-
-  size_t num_columns() const {
-    return row_group_->columns.size();
-  }
-
- private:
-  friend class ParquetFileReader;
-
-  ParquetFileReader* parent_;
-  parquet::RowGroup* row_group_;
-
-  // Column index -> ColumnReader
-  std::unordered_map<int, std::shared_ptr<ColumnReader> > column_readers_;
-};
-
-
-class ParquetFileReader {
- public:
-  ParquetFileReader();
-  ~ParquetFileReader();
-
-  // This class does _not_ take ownership of the file. You must manage its
-  // lifetime separately
-  void Open(RandomAccessSource* buffer);
-
-  void Close();
-
-  void ParseMetaData();
-
-  // The RowGroupReader is owned by the FileReader
-  RowGroupReader* RowGroup(size_t i);
-
-  size_t num_row_groups() const {
-    return metadata_.row_groups.size();
-  }
-
-  const ColumnDescriptor* column_descr(size_t i) const {
-    return schema_descr_.Column(i);
-  }
-
-  size_t num_columns() const {
-    return schema_descr_.num_columns();
-  }
-
-  const parquet::FileMetaData& metadata() const {
-    return metadata_;
-  }
-
-  void DebugPrint(std::ostream& stream, bool print_values = true);
-
- private:
-  friend class RowGroupReader;
-
-  parquet::FileMetaData metadata_;
-  SchemaDescriptor schema_descr_;
-
-  bool parsed_metadata_;
-
-  // Row group index -> RowGroupReader
-  std::unordered_map<int, std::shared_ptr<RowGroupReader> > row_group_readers_;
-
-  RandomAccessSource* buffer_;
-};
-
-
-} // namespace parquet_cpp
-
-#endif // PARQUET_FILE_READER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/schema/descriptor.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema/descriptor.h b/src/parquet/schema/descriptor.h
index 144666f..d27dcc1 100644
--- a/src/parquet/schema/descriptor.h
+++ b/src/parquet/schema/descriptor.h
@@ -98,6 +98,10 @@ class SchemaDescriptor {
     return leaves_.size();
   }
 
+  const schema::NodePtr& schema() const {
+    return schema_;
+  }
+
  private:
   friend class ColumnDescriptor;
 


Mime
View raw message