Repository: parquet-cpp
Updated Branches:
refs/heads/master c11e7d487 -> d51a1dcdd
PARQUET-497: Decouple serialized file internals from the ParquetFileReader public API
This depends on PARQUET-501. A bit of a refactoring bloodbath, but extremely important to
split out these details so that we can instrument the file reader public APIs with test fixtures
for unit testing purposes.
Author: Wes McKinney <wes@cloudera.com>
Closes #47 from wesm/PARQUET-497 and squashes the following commits:
aa152ad [Wes McKinney] Decouple Parquet file format details and Thrift metadata from the ParquetFileReader
and RowGroupReader public APIs.
Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/d51a1dcd
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/d51a1dcd
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/d51a1dcd
Branch: refs/heads/master
Commit: d51a1dcddc9b42b83333581d68607c6299d37aa8
Parents: c11e7d4
Author: Wes McKinney <wes@cloudera.com>
Authored: Thu Feb 11 00:21:32 2016 -0800
Committer: Julien Le Dem <julien@dremio.com>
Committed: Thu Feb 11 00:21:32 2016 -0800
----------------------------------------------------------------------
CMakeLists.txt | 5 +-
example/parquet-dump-schema.cc | 25 +--
example/parquet_reader.cc | 18 +--
src/parquet/CMakeLists.txt | 1 -
src/parquet/file/CMakeLists.txt | 20 +++
src/parquet/file/reader-internal.cc | 166 ++++++++++++++++++++
src/parquet/file/reader-internal.h | 79 ++++++++++
src/parquet/file/reader.cc | 207 ++++++++++++++++++++++++
src/parquet/file/reader.h | 139 ++++++++++++++++
src/parquet/parquet.h | 2 +-
src/parquet/reader-test.cc | 21 ++-
src/parquet/reader.cc | 262 -------------------------------
src/parquet/reader.h | 118 --------------
src/parquet/schema/descriptor.h | 4 +
14 files changed, 638 insertions(+), 429 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 452b315..d262375 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -293,7 +293,9 @@ set(LIBPARQUET_SRCS
src/parquet/column/serialized-page.cc
src/parquet/column/reader.cc
src/parquet/column/scanner.cc
- src/parquet/reader.cc
+
+ src/parquet/file/reader.cc
+ src/parquet/file/reader-internal.cc
src/parquet/schema/converter.cc
src/parquet/schema/descriptor.cc
@@ -331,6 +333,7 @@ add_subdirectory(src/parquet)
add_subdirectory(src/parquet/column)
add_subdirectory(src/parquet/compression)
add_subdirectory(src/parquet/encodings)
+add_subdirectory(src/parquet/file)
add_subdirectory(src/parquet/schema)
add_subdirectory(src/parquet/thrift)
add_subdirectory(src/parquet/util)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/example/parquet-dump-schema.cc
----------------------------------------------------------------------
diff --git a/example/parquet-dump-schema.cc b/example/parquet-dump-schema.cc
index 09c715c..d8eb2ee 100644
--- a/example/parquet-dump-schema.cc
+++ b/example/parquet-dump-schema.cc
@@ -15,7 +15,7 @@
// specific language governing permissions and limitations
// under the License.
-#include <parquet/reader.h>
+#include <parquet/parquet.h>
#include <parquet/schema/converter.h>
#include <parquet/schema/printer.h>
@@ -23,31 +23,12 @@
using namespace parquet_cpp;
-void DumpSchema(const ParquetFileReader* reader) {
- auto schema = reader->metadata().schema;
- schema::FlatSchemaConverter converter(&schema[0], schema.size());
- std::unique_ptr<schema::Node> root = converter.Convert();
-
- PrintSchema(root.get(), std::cout);
-}
-
int main(int argc, char** argv) {
std::string filename = argv[1];
- parquet_cpp::ParquetFileReader reader;
- parquet_cpp::LocalFileSource file;
-
- file.Open(filename);
- if (!file.is_open()) {
- std::cerr << "Could not open file " << file.path()
- << std::endl;
- return -1;
- }
-
try {
- reader.Open(&file);
- reader.ParseMetaData();
- DumpSchema(&reader);
+ std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename);
+ PrintSchema(reader->descr()->schema().get(), std::cout);
} catch (const std::exception& e) {
std::cerr << "Parquet error: "
<< e.what()
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index ca717df..8e498c7 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -18,6 +18,9 @@
#include <parquet/parquet.h>
#include <iostream>
+#include <memory>
+
+using namespace parquet_cpp;
int main(int argc, char** argv) {
if (argc > 3) {
@@ -39,20 +42,9 @@ int main(int argc, char** argv) {
}
}
- parquet_cpp::ParquetFileReader reader;
- parquet_cpp::LocalFileSource file;
-
- file.Open(filename);
- if (!file.is_open()) {
- std::cerr << "Could not open file " << file.path()
- << std::endl;
- return -1;
- }
-
try {
- reader.Open(&file);
- reader.ParseMetaData();
- reader.DebugPrint(std::cout, print_values);
+ std::unique_ptr<ParquetFileReader> reader = ParquetFileReader::OpenFile(filename);
+ reader->DebugPrint(std::cout, print_values);
} catch (const std::exception& e) {
std::cerr << "Parquet error: "
<< e.what()
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index 2d69ba0..6a47917 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -18,7 +18,6 @@
# Headers: top level
install(FILES
parquet.h
- reader.h
exception.h
types.h
DESTINATION include/parquet)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt
new file mode 100644
index 0000000..ef6ac01
--- /dev/null
+++ b/src/parquet/file/CMakeLists.txt
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+install(FILES
+ reader.h
+ DESTINATION include/parquet/file)
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
new file mode 100644
index 0000000..7b0a719
--- /dev/null
+++ b/src/parquet/file/reader-internal.cc
@@ -0,0 +1,166 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file/reader-internal.h"
+
+#include <memory>
+#include <vector>
+
+#include "parquet/column/serialized-page.h"
+#include "parquet/schema/converter.h"
+#include "parquet/thrift/util.h"
+#include "parquet/util/input.h"
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// SerializedRowGroup
+
+int SerializedRowGroup::num_columns() const {
+ return metadata_->columns.size();
+}
+
+std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
+ // Read column chunk from the file
+ const parquet::ColumnChunk& col = metadata_->columns[i];
+
+ int64_t col_start = col.meta_data.data_page_offset;
+ if (col.meta_data.__isset.dictionary_page_offset &&
+ col_start > col.meta_data.dictionary_page_offset) {
+ col_start = col.meta_data.dictionary_page_offset;
+ }
+
+ // TODO(wesm): some input streams (e.g. memory maps) may not require
+ // copying data. This should be added to the input stream API to support
+ // zero-copy streaming
+ std::unique_ptr<InputStream> input(
+ new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
+
+ source_->Seek(col_start);
+ ScopedInMemoryInputStream* scoped_input =
+ static_cast<ScopedInMemoryInputStream*>(input.get());
+ size_t bytes_read = source_->Read(scoped_input->size(), scoped_input->data());
+
+ if (bytes_read != scoped_input->size()) {
+ throw ParquetException("Unable to read column chunk data");
+ }
+
+ const ColumnDescriptor* descr = schema_->Column(i);
+
+ return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(input),
+ col.meta_data.codec));
+}
+
+RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) {
+ const parquet::ColumnMetaData& meta_data = metadata_->columns[i].meta_data;
+
+ RowGroupStatistics result;
+ result.num_values = meta_data.num_values;
+ result.null_count = meta_data.statistics.null_count;
+ result.distinct_count = meta_data.statistics.distinct_count;
+
+ return result;
+}
+
+// ----------------------------------------------------------------------
+// SerializedFile: Parquet on-disk layout
+
+static constexpr uint32_t FOOTER_SIZE = 8;
+static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
+
+std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
+ std::unique_ptr<RandomAccessSource> source) {
+ std::unique_ptr<ParquetFileReader::Contents> result(
+ new SerializedFile(std::move(source)));
+
+ // Access private methods here, but otherwise unavailable
+ SerializedFile* file = static_cast<SerializedFile*>(result.get());
+
+ // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor
+ file->ParseMetaData();
+
+ return result;
+}
+
+void SerializedFile::Close() {
+ source_->Close();
+}
+
+std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
+ std::unique_ptr<SerializedRowGroup> contents(new SerializedRowGroup(source_.get(),
+ &schema_, &metadata_.row_groups[i]));
+
+ return std::make_shared<RowGroupReader>(&schema_, std::move(contents));
+}
+
+int64_t SerializedFile::num_rows() const {
+ return metadata_.num_rows;
+}
+
+int SerializedFile::num_columns() const {
+ return schema_.num_columns();
+}
+
+int SerializedFile::num_row_groups() const {
+ return metadata_.row_groups.size();
+}
+
+SerializedFile::SerializedFile(std::unique_ptr<RandomAccessSource> source) :
+ source_(std::move(source)) {}
+
+
+void SerializedFile::ParseMetaData() {
+ size_t filesize = source_->Size();
+
+ if (filesize < FOOTER_SIZE) {
+ throw ParquetException("Corrupted file, smaller than file footer");
+ }
+
+ uint8_t footer_buffer[FOOTER_SIZE];
+ source_->Seek(filesize - FOOTER_SIZE);
+ size_t bytes_read = source_->Read(FOOTER_SIZE, footer_buffer);
+
+ if (bytes_read != FOOTER_SIZE) {
+ throw ParquetException("Invalid parquet file. Corrupt footer.");
+ }
+ if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
+ throw ParquetException("Invalid parquet file. Corrupt footer.");
+ }
+
+ uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer);
+ size_t metadata_start = filesize - FOOTER_SIZE - metadata_len;
+ if (FOOTER_SIZE + metadata_len > filesize) {
+ throw ParquetException("Invalid parquet file. File is less than "
+ "file metadata size.");
+ }
+
+ source_->Seek(metadata_start);
+
+ std::vector<uint8_t> metadata_buffer(metadata_len);
+ bytes_read = source_->Read(metadata_len, &metadata_buffer[0]);
+ if (bytes_read != metadata_len) {
+ throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
+ }
+
+ DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_);
+
+ schema::FlatSchemaConverter converter(&metadata_.schema[0],
+ metadata_.schema.size());
+ schema_.Init(converter.Convert());
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
new file mode 100644
index 0000000..8ba105e
--- /dev/null
+++ b/src/parquet/file/reader-internal.h
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_FILE_READER_INTERNAL_H
+#define PARQUET_FILE_READER_INTERNAL_H
+
+#include "parquet/file/reader.h"
+
+#include <memory>
+
+#include "parquet/schema/descriptor.h"
+#include "parquet/util/input.h"
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet_cpp {
+
+// RowGroupReader::Contents implementation for the Parquet file specification
+class SerializedRowGroup : public RowGroupReader::Contents {
+ public:
+ SerializedRowGroup(RandomAccessSource* source, const SchemaDescriptor* schema,
+ const parquet::RowGroup* metadata) :
+ source_(source),
+ schema_(schema),
+ metadata_(metadata) {}
+
+ virtual int num_columns() const;
+ virtual std::unique_ptr<PageReader> GetColumnPageReader(int i);
+ virtual RowGroupStatistics GetColumnStats(int i);
+
+ private:
+ RandomAccessSource* source_;
+ const SchemaDescriptor* schema_;
+ const parquet::RowGroup* metadata_;
+};
+
+// An implementation of ParquetFileReader::Contents that deals with the Parquet
+// file structure, Thrift deserialization, and other internal matters
+
+class SerializedFile : public ParquetFileReader::Contents {
+ public:
+ // Open the valid and validate the header, footer, and parse the Thrift metadata
+ //
+ // This class does _not_ take ownership of the data source. You must manage its
+ // lifetime separately
+ static std::unique_ptr<ParquetFileReader::Contents> Open(
+ std::unique_ptr<RandomAccessSource> source);
+ virtual void Close();
+ virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i);
+ virtual int64_t num_rows() const;
+ virtual int num_columns() const;
+ virtual int num_row_groups() const;
+
+ private:
+ // This class takes ownership of the provided data source
+ explicit SerializedFile(std::unique_ptr<RandomAccessSource> source);
+
+ std::unique_ptr<RandomAccessSource> source_;
+ parquet::FileMetaData metadata_;
+
+ void ParseMetaData();
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_FILE_READER_INTERNAL_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
new file mode 100644
index 0000000..6ef59ed
--- /dev/null
+++ b/src/parquet/file/reader.cc
@@ -0,0 +1,207 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/file/reader.h"
+
+#include <cstdio>
+#include <cstring>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include "parquet/column/reader.h"
+#include "parquet/column/scanner.h"
+
+#include "parquet/exception.h"
+#include "parquet/file/reader-internal.h"
+
+using std::string;
+using std::vector;
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// RowGroupReader public API
+
+RowGroupReader::RowGroupReader(const SchemaDescriptor* schema,
+ std::unique_ptr<Contents> contents) :
+ schema_(schema),
+ contents_(std::move(contents)) {}
+
+int RowGroupReader::num_columns() const {
+ return contents_->num_columns();
+}
+
+std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
+ // TODO: boundschecking
+ auto it = column_readers_.find(i);
+ if (it != column_readers_.end()) {
+ // Already have constructed the ColumnReader
+ return it->second;
+ }
+
+ const ColumnDescriptor* descr = schema_->Column(i);
+
+ std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
+ std::shared_ptr<ColumnReader> reader = ColumnReader::Make(descr,
+ std::move(page_reader));
+ column_readers_[i] = reader;
+ return reader;
+}
+
+RowGroupStatistics RowGroupReader::GetColumnStats(int i) const {
+ return contents_->GetColumnStats(i);
+}
+
+// ----------------------------------------------------------------------
+// ParquetFileReader public API
+
+ParquetFileReader::ParquetFileReader() : schema_(nullptr) {}
+ParquetFileReader::~ParquetFileReader() {}
+
+std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string&
path) {
+ std::unique_ptr<LocalFileSource> file(new LocalFileSource());
+ file->Open(path);
+
+ auto contents = SerializedFile::Open(std::move(file));
+
+ std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
+ result->Open(std::move(contents));
+
+ return result;
+}
+
+void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents)
{
+ contents_ = std::move(contents);
+ schema_ = contents_->schema();
+}
+
+void ParquetFileReader::Close() {
+ contents_->Close();
+}
+
+int ParquetFileReader::num_row_groups() const {
+ return contents_->num_row_groups();
+}
+
+int64_t ParquetFileReader::num_rows() const {
+ return contents_->num_rows();
+}
+
+int ParquetFileReader::num_columns() const {
+ return schema_->num_columns();
+}
+
+RowGroupReader* ParquetFileReader::RowGroup(int i) {
+ if (i >= num_row_groups()) {
+ std::stringstream ss;
+ ss << "The file only has " << num_row_groups()
+ << "row groups, requested reader for: "
+ << i;
+ throw ParquetException(ss.str());
+ }
+
+ auto it = row_group_readers_.find(i);
+ if (it != row_group_readers_.end()) {
+ // Constructed the RowGroupReader already
+ return it->second.get();
+ }
+
+ row_group_readers_[i] = contents_->GetRowGroup(i);
+ return row_group_readers_[i].get();
+}
+
+// ----------------------------------------------------------------------
+// ParquetFileReader::DebugPrint
+
+// the fixed initial size is just for an example
+#define COL_WIDTH "20"
+
+
+void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
+ stream << "File statistics:\n";
+ stream << "Total rows: " << this->num_rows() << "\n";
+
+ for (int i = 0; i < num_columns(); ++i) {
+ const ColumnDescriptor* descr = schema_->Column(i);
+ stream << "Column " << i << ": "
+ << descr->name()
+ << " ("
+ << type_to_string(descr->physical_type())
+ << ")" << std::endl;
+ }
+
+ for (int r = 0; r < num_row_groups(); ++r) {
+ stream << "--- Row Group " << r << " ---\n";
+
+ RowGroupReader* group_reader = RowGroup(r);
+
+ // Print column metadata
+ size_t num_columns = group_reader->num_columns();
+
+ for (int i = 0; i < num_columns; ++i) {
+ RowGroupStatistics stats = group_reader->GetColumnStats(i);
+
+ stream << "Column " << i << ": "
+ << stats.num_values << " rows, "
+ << stats.null_count << " null values, "
+ << stats.distinct_count << " distinct values, "
+ << std::endl;
+ }
+
+ if (!print_values) {
+ continue;
+ }
+
+ static constexpr size_t bufsize = 25;
+ char buffer[bufsize];
+
+ // Create readers for all columns and print contents
+ vector<std::shared_ptr<Scanner> > scanners(num_columns, NULL);
+ for (int i = 0; i < num_columns; ++i) {
+ std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
+ Type::type col_type = col_reader->type();
+
+ std::stringstream ss;
+ ss << "%-" << COL_WIDTH << "s";
+ std::string fmt = ss.str();
+
+ snprintf(buffer, bufsize, fmt.c_str(), column_schema(i)->name().c_str());
+ stream << buffer;
+
+ // This is OK in this method as long as the RowGroupReader does not get
+ // deleted
+ scanners[i] = Scanner::Make(col_reader);
+ }
+ stream << "\n";
+
+ bool hasRow;
+ do {
+ hasRow = false;
+ for (int i = 0; i < num_columns; ++i) {
+ if (scanners[i]->HasNext()) {
+ hasRow = true;
+ scanners[i]->PrintNext(stream, 17);
+ }
+ }
+ stream << "\n";
+ } while (hasRow);
+ }
+}
+
+} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
new file mode 100644
index 0000000..3ff8697
--- /dev/null
+++ b/src/parquet/file/reader.h
@@ -0,0 +1,139 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_FILE_READER_H
+#define PARQUET_FILE_READER_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <stdio.h>
+#include <unordered_map>
+
+#include "parquet/types.h"
+#include "parquet/schema/descriptor.h"
+
+// TODO(wesm): Still depends on Thrift
+#include "parquet/column/page.h"
+
+namespace parquet_cpp {
+
+class ColumnReader;
+class ParquetFileReader;
+
+struct RowGroupStatistics {
+ int64_t num_values;
+ int64_t null_count;
+ int64_t distinct_count;
+};
+
+class RowGroupReader {
+ public:
+ // Forward declare the PIMPL
+ struct Contents {
+ virtual int num_columns() const = 0;
+ virtual RowGroupStatistics GetColumnStats(int i) = 0;
+ virtual std::unique_ptr<PageReader> GetColumnPageReader(int i) = 0;
+ };
+
+ RowGroupReader(const SchemaDescriptor* schema, std::unique_ptr<Contents> contents);
+
+ // Construct a ColumnReader for the indicated row group-relative
+ // column. Ownership is shared with the RowGroupReader.
+ std::shared_ptr<ColumnReader> Column(int i);
+ int num_columns() const;
+
+ RowGroupStatistics GetColumnStats(int i) const;
+
+ private:
+ // Owned by the parent ParquetFileReader
+ const SchemaDescriptor* schema_;
+
+ // PIMPL idiom
+ // This is declared in the .cc file so that we can hide compiled Thrift
+ // headers from the public API and also more easily create test fixtures.
+ std::unique_ptr<Contents> contents_;
+
+ // Column index -> ColumnReader
+ std::unordered_map<int, std::shared_ptr<ColumnReader> > column_readers_;
+};
+
+
+class ParquetFileReader {
+ public:
+ // Forward declare the PIMPL
+ struct Contents {
+ // Perform any cleanup associated with the file contents
+ virtual void Close() = 0;
+
+ virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i) = 0;
+
+ virtual int64_t num_rows() const = 0;
+ virtual int num_columns() const = 0;
+ virtual int num_row_groups() const = 0;
+
+ // Return const-poitner to make it clear that this object is not to be copied
+ const SchemaDescriptor* schema() const {
+ return &schema_;
+ }
+ SchemaDescriptor schema_;
+ };
+
+ ParquetFileReader();
+ ~ParquetFileReader();
+
+ // API Convenience to open a serialized Parquet file on disk
+ static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path);
+
+ void Open(std::unique_ptr<Contents> contents);
+ void Close();
+
+ // The RowGroupReader is owned by the FileReader
+ RowGroupReader* RowGroup(int i);
+
+ int num_columns() const;
+ int64_t num_rows() const;
+ int num_row_groups() const;
+
+ // Returns the file schema descriptor
+ const SchemaDescriptor* descr() {
+ return schema_;
+ }
+
+ const ColumnDescriptor* column_schema(int i) const {
+ return schema_->Column(i);
+ }
+
+ void DebugPrint(std::ostream& stream, bool print_values = true);
+
+ private:
+ // PIMPL idiom
+ // This is declared in the .cc file so that we can hide compiled Thrift
+ // headers from the public API and also more easily create test fixtures.
+ std::unique_ptr<Contents> contents_;
+
+ // The SchemaDescriptor is provided by the Contents impl
+ const SchemaDescriptor* schema_;
+
+ // Row group index -> RowGroupReader
+ std::unordered_map<int, std::shared_ptr<RowGroupReader> > row_group_readers_;
+};
+
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_FILE_READER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index 7030d0e..b8624ae 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -27,8 +27,8 @@
#include <vector>
#include "parquet/exception.h"
-#include "parquet/reader.h"
#include "parquet/column/reader.h"
+#include "parquet/file/reader.h"
#include "parquet/util/input.h"
#include "parquet/util/output.h"
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index 8da8b99..8599e7e 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -17,12 +17,13 @@
#include <cstdlib>
#include <iostream>
+#include <memory>
#include <sstream>
#include <string>
#include <gtest/gtest.h>
-#include "parquet/reader.h"
+#include "parquet/file/reader.h"
#include "parquet/column/reader.h"
#include "parquet/column/scanner.h"
#include "parquet/util/input.h"
@@ -41,24 +42,22 @@ class TestAllTypesPlain : public ::testing::Test {
std::stringstream ss;
ss << dir_string << "/" << "alltypes_plain.parquet";
- file_.Open(ss.str());
- reader_.Open(&file_);
+
+ reader_ = ParquetFileReader::OpenFile(ss.str());
}
void TearDown() {}
protected:
- LocalFileSource file_;
- ParquetFileReader reader_;
+ std::unique_ptr<ParquetFileReader> reader_;
};
-TEST_F(TestAllTypesPlain, ParseMetaData) {
- reader_.ParseMetaData();
+TEST_F(TestAllTypesPlain, NoopConstructDestruct) {
}
TEST_F(TestAllTypesPlain, TestBatchRead) {
- RowGroupReader* group = reader_.RowGroup(0);
+ RowGroupReader* group = reader_->RowGroup(0);
// column 0, id
std::shared_ptr<Int32Reader> col =
@@ -86,7 +85,7 @@ TEST_F(TestAllTypesPlain, TestBatchRead) {
}
TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
- RowGroupReader* group = reader_.RowGroup(0);
+ RowGroupReader* group = reader_->RowGroup(0);
// column 0, id
std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
@@ -103,7 +102,7 @@ TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) {
- RowGroupReader* group = reader_.RowGroup(0);
+ RowGroupReader* group = reader_->RowGroup(0);
// column 0, id
std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
@@ -118,7 +117,7 @@ TEST_F(TestAllTypesPlain, DebugPrintWorks) {
std::stringstream ss;
// Automatically parses metadata
- reader_.DebugPrint(ss);
+ reader_->DebugPrint(ss);
std::string result = ss.str();
ASSERT_GT(result.size(), 0);
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
deleted file mode 100644
index 3fcce90..0000000
--- a/src/parquet/reader.cc
+++ /dev/null
@@ -1,262 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/reader.h"
-
-#include <cstdio>
-#include <cstring>
-#include <memory>
-#include <sstream>
-#include <string>
-#include <vector>
-
-#include "parquet/column/reader.h"
-#include "parquet/column/serialized-page.h"
-#include "parquet/column/scanner.h"
-
-#include "parquet/exception.h"
-#include "parquet/schema/converter.h"
-#include "parquet/thrift/util.h"
-
-using std::string;
-using std::vector;
-
-namespace parquet_cpp {
-
-// ----------------------------------------------------------------------
-// RowGroupReader
-
-std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) {
- // TODO: boundschecking
- auto it = column_readers_.find(i);
- if (it != column_readers_.end()) {
- // Already have constructed the ColumnReader
- return it->second;
- }
-
- const parquet::ColumnChunk& col = row_group_->columns[i];
-
- size_t col_start = col.meta_data.data_page_offset;
- if (col.meta_data.__isset.dictionary_page_offset &&
- col_start > col.meta_data.dictionary_page_offset) {
- col_start = col.meta_data.dictionary_page_offset;
- }
-
- std::unique_ptr<InputStream> input(
- new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
-
- RandomAccessSource* source = this->parent_->buffer_;
-
- source->Seek(col_start);
-
- // TODO(wesm): Law of demeter violation
- ScopedInMemoryInputStream* scoped_input =
- static_cast<ScopedInMemoryInputStream*>(input.get());
- size_t bytes_read = source->Read(scoped_input->size(), scoped_input->data());
- if (bytes_read != scoped_input->size()) {
- std::cout << "Bytes needed: " << col.meta_data.total_compressed_size <<
std::endl;
- std::cout << "Bytes read: " << bytes_read << std::endl;
- throw ParquetException("Unable to read column chunk data");
- }
-
- const ColumnDescriptor* descr = parent_->column_descr(i);
-
- std::unique_ptr<PageReader> pager(
- new SerializedPageReader(std::move(input), col.meta_data.codec));
-
- std::shared_ptr<ColumnReader> reader = ColumnReader::Make(descr,
- std::move(pager));
- column_readers_[i] = reader;
-
- return reader;
-}
-
-// ----------------------------------------------------------------------
-// ParquetFileReader
-
-// 4 byte constant + 4 byte metadata len
-static constexpr uint32_t FOOTER_SIZE = 8;
-static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-ParquetFileReader::ParquetFileReader() :
- parsed_metadata_(false),
- buffer_(nullptr) {}
-
-ParquetFileReader::~ParquetFileReader() {}
-
-void ParquetFileReader::Open(RandomAccessSource* buffer) {
- buffer_ = buffer;
-}
-
-void ParquetFileReader::Close() {
- buffer_->Close();
-}
-
-RowGroupReader* ParquetFileReader::RowGroup(size_t i) {
- if (!parsed_metadata_) {
- ParseMetaData();
- }
-
- if (i >= num_row_groups()) {
- std::stringstream ss;
- ss << "The file only has " << num_row_groups()
- << "row groups, requested reader for: "
- << i;
- throw ParquetException(ss.str());
- }
-
- auto it = row_group_readers_.find(i);
- if (it != row_group_readers_.end()) {
- // Constructed the RowGroupReader already
- return it->second.get();
- }
- if (!parsed_metadata_) {
- ParseMetaData();
- }
-
- // Construct the RowGroupReader
- row_group_readers_[i] = std::make_shared<RowGroupReader>(this,
- &metadata_.row_groups[i]);
- return row_group_readers_[i].get();
-}
-
-void ParquetFileReader::ParseMetaData() {
- size_t filesize = buffer_->Size();
-
- if (filesize < FOOTER_SIZE) {
- throw ParquetException("Corrupted file, smaller than file footer");
- }
-
- uint8_t footer_buffer[FOOTER_SIZE];
-
- buffer_->Seek(filesize - FOOTER_SIZE);
-
- size_t bytes_read = buffer_->Read(FOOTER_SIZE, footer_buffer);
-
- if (bytes_read != FOOTER_SIZE) {
- throw ParquetException("Invalid parquet file. Corrupt footer.");
- }
- if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
- throw ParquetException("Invalid parquet file. Corrupt footer.");
- }
-
- uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer);
- size_t metadata_start = filesize - FOOTER_SIZE - metadata_len;
- if (FOOTER_SIZE + metadata_len > filesize) {
- throw ParquetException("Invalid parquet file. File is less than file metadata size.");
- }
-
- buffer_->Seek(metadata_start);
-
- std::vector<uint8_t> metadata_buffer(metadata_len);
- bytes_read = buffer_->Read(metadata_len, &metadata_buffer[0]);
- if (bytes_read != metadata_len) {
- throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
- }
- DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_);
-
- schema::FlatSchemaConverter converter(&metadata_.schema[0],
- metadata_.schema.size());
- schema_descr_.Init(converter.Convert());
-
- parsed_metadata_ = true;
-}
-
-// ----------------------------------------------------------------------
-// ParquetFileReader::DebugPrint
-
-// the fixed initial size is just for an example
-#define COL_WIDTH "20"
-
-
-void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
- if (!parsed_metadata_) {
- ParseMetaData();
- }
-
- stream << "File statistics:\n";
- stream << "Total rows: " << metadata_.num_rows << "\n";
- for (int i = 0; i < num_columns(); ++i) {
- const ColumnDescriptor* descr = column_descr(i);
- stream << "Column " << i << ": "
- << descr->name()
- << " ("
- << type_to_string(descr->physical_type())
- << ")" << std::endl;
- }
-
- for (int r = 0; r < num_row_groups(); ++r) {
- stream << "--- Row Group " << r << " ---\n";
-
- RowGroupReader* group_reader = RowGroup(r);
-
- // Print column metadata
- size_t num_columns = group_reader->num_columns();
-
- for (int i = 0; i < num_columns; ++i) {
- const parquet::ColumnMetaData* meta_data = group_reader->column_metadata(i);
- stream << "Column " << i << ": "
- << meta_data->num_values << " rows, "
- << meta_data->statistics.null_count << " null values, "
- << meta_data->statistics.distinct_count << " distinct values,
"
- << "min value: " << (meta_data->statistics.min.length() >
0 ?
- meta_data->statistics.min : "N/A")
- << ", max value: " << (meta_data->statistics.max.length() >
0 ?
- meta_data->statistics.max : "N/A") << ".\n";
- }
-
- if (!print_values) {
- continue;
- }
-
- static constexpr size_t bufsize = 25;
- char buffer[bufsize];
-
- // Create readers for all columns and print contents
- vector<std::shared_ptr<Scanner> > scanners(num_columns, NULL);
- for (int i = 0; i < num_columns; ++i) {
- std::shared_ptr<ColumnReader> col_reader = group_reader->Column(i);
- Type::type col_type = col_reader->type();
-
- std::stringstream ss;
- ss << "%-" << COL_WIDTH << "s";
- std::string fmt = ss.str();
-
- snprintf(buffer, bufsize, fmt.c_str(), column_descr(i)->name().c_str());
- stream << buffer;
-
- // This is OK in this method as long as the RowGroupReader does not get
- // deleted
- scanners[i] = Scanner::Make(col_reader);
- }
- stream << "\n";
-
- bool hasRow;
- do {
- hasRow = false;
- for (int i = 0; i < num_columns; ++i) {
- if (scanners[i]->HasNext()) {
- hasRow = true;
- scanners[i]->PrintNext(stream, 17);
- }
- }
- stream << "\n";
- } while (hasRow);
- }
-}
-
-} // namespace parquet_cpp
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/reader.h b/src/parquet/reader.h
deleted file mode 100644
index 3a9dc5d..0000000
--- a/src/parquet/reader.h
+++ /dev/null
@@ -1,118 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements. See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership. The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied. See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_FILE_READER_H
-#define PARQUET_FILE_READER_H
-
-#include <cstdint>
-#include <memory>
-#include <string>
-#include <stdio.h>
-#include <unordered_map>
-
-#include "parquet/thrift/parquet_types.h"
-
-#include "parquet/types.h"
-#include "parquet/schema/descriptor.h"
-#include "parquet/util/input.h"
-
-namespace parquet_cpp {
-
-class ColumnReader;
-class ParquetFileReader;
-
-class RowGroupReader {
- public:
- RowGroupReader(ParquetFileReader* parent, parquet::RowGroup* group) :
- parent_(parent),
- row_group_(group) {}
-
- // Construct a ColumnReader for the indicated row group-relative
- // column. Ownership is shared with the RowGroupReader.
- std::shared_ptr<ColumnReader> Column(size_t i);
-
- const parquet::ColumnMetaData* column_metadata(size_t i) const {
- return &row_group_->columns[i].meta_data;
- }
-
- size_t num_columns() const {
- return row_group_->columns.size();
- }
-
- private:
- friend class ParquetFileReader;
-
- ParquetFileReader* parent_;
- parquet::RowGroup* row_group_;
-
- // Column index -> ColumnReader
- std::unordered_map<int, std::shared_ptr<ColumnReader> > column_readers_;
-};
-
-
-class ParquetFileReader {
- public:
- ParquetFileReader();
- ~ParquetFileReader();
-
- // This class does _not_ take ownership of the file. You must manage its
- // lifetime separately
- void Open(RandomAccessSource* buffer);
-
- void Close();
-
- void ParseMetaData();
-
- // The RowGroupReader is owned by the FileReader
- RowGroupReader* RowGroup(size_t i);
-
- size_t num_row_groups() const {
- return metadata_.row_groups.size();
- }
-
- const ColumnDescriptor* column_descr(size_t i) const {
- return schema_descr_.Column(i);
- }
-
- size_t num_columns() const {
- return schema_descr_.num_columns();
- }
-
- const parquet::FileMetaData& metadata() const {
- return metadata_;
- }
-
- void DebugPrint(std::ostream& stream, bool print_values = true);
-
- private:
- friend class RowGroupReader;
-
- parquet::FileMetaData metadata_;
- SchemaDescriptor schema_descr_;
-
- bool parsed_metadata_;
-
- // Row group index -> RowGroupReader
- std::unordered_map<int, std::shared_ptr<RowGroupReader> > row_group_readers_;
-
- RandomAccessSource* buffer_;
-};
-
-
-} // namespace parquet_cpp
-
-#endif // PARQUET_FILE_READER_H
http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d51a1dcd/src/parquet/schema/descriptor.h
----------------------------------------------------------------------
diff --git a/src/parquet/schema/descriptor.h b/src/parquet/schema/descriptor.h
index 144666f..d27dcc1 100644
--- a/src/parquet/schema/descriptor.h
+++ b/src/parquet/schema/descriptor.h
@@ -98,6 +98,10 @@ class SchemaDescriptor {
return leaves_.size();
}
+ const schema::NodePtr& schema() const {
+ return schema_;
+ }
+
private:
friend class ColumnDescriptor;
|