Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id 109AA200D56 for ; Tue, 12 Dec 2017 20:12:22 +0100 (CET) Received: by cust-asf.ponee.io (Postfix) id 0EFC6160C00; Tue, 12 Dec 2017 19:12:22 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id 5955A160BE7 for ; Tue, 12 Dec 2017 20:12:19 +0100 (CET) Received: (qmail 15971 invoked by uid 500); 12 Dec 2017 19:12:18 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 15962 invoked by uid 99); 12 Dec 2017 19:12:18 -0000 Received: from ec2-52-202-80-70.compute-1.amazonaws.com (HELO gitbox.apache.org) (52.202.80.70) by apache.org (qpsmtpd/0.29) with ESMTP; Tue, 12 Dec 2017 19:12:18 +0000 Received: by gitbox.apache.org (ASF Mail Server at gitbox.apache.org, from userid 33) id 4627681922; Tue, 12 Dec 2017 19:12:15 +0000 (UTC) Date: Tue, 12 Dec 2017 19:12:15 +0000 To: "commits@parquet.apache.org" Subject: [parquet-cpp] branch master updated: PARQUET-859: Flatten parquet/file directory, consolidate file reader, file writer code MIME-Version: 1.0 Content-Type: text/plain; charset=utf-8 Content-Transfer-Encoding: 8bit Message-ID: <151310593524.32670.5438458846727991650@gitbox.apache.org> From: wesm@apache.org X-Git-Host: gitbox.apache.org X-Git-Repo: parquet-cpp X-Git-Refname: refs/heads/master X-Git-Reftype: branch X-Git-Oldrev: d2ac53c0d467eb04d02e203fa5bf7cad5825f580 X-Git-Newrev: 2cf514eba7227e53d4e6cc1abd814084dfa67dfd X-Git-Rev: 2cf514eba7227e53d4e6cc1abd814084dfa67dfd X-Git-NotificationType: ref_changed_plus_diff X-Git-Multimail-Version: 1.5.dev Auto-Submitted: auto-generated archived-at: Tue, 12 Dec 2017 19:12:22 -0000 This is an automated email from the ASF dual-hosted git repository. wesm pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/parquet-cpp.git The following commit(s) were added to refs/heads/master by this push: new 2cf514e PARQUET-859: Flatten parquet/file directory, consolidate file reader, file writer code 2cf514e is described below commit 2cf514eba7227e53d4e6cc1abd814084dfa67dfd Author: Wes McKinney AuthorDate: Tue Dec 12 14:12:08 2017 -0500 PARQUET-859: Flatten parquet/file directory, consolidate file reader, file writer code I believe this makes the codebase simpler and easier to navigate. By consolidating the file reader/writer code, further refactoring and internal improvements will be less obtuse Author: Wes McKinney Closes #424 from wesm/PARQUET-859 and squashes the following commits: c98e3d4 [Wes McKinney] Do not generate Thrift files in source directory e987f47 [Wes McKinney] Remove superfluous PARQUET_EXPORT 0fb83a8 [Wes McKinney] Consolidate file_reader.h, file_reader-internal.h ac6e2c5 [Wes McKinney] First cut flattening of parquet/file directory and consolidate writer, writer-internal headers --- CMakeLists.txt | 27 +- src/parquet/CMakeLists.txt | 7 + src/parquet/api/reader.h | 8 +- src/parquet/api/writer.h | 11 +- src/parquet/arrow/arrow-reader-writer-benchmark.cc | 5 +- src/parquet/arrow/arrow-reader-writer-test.cc | 2 +- src/parquet/arrow/record_reader.h | 2 +- src/parquet/column-io-benchmark.cc | 12 +- src/parquet/column_page.h | 29 -- src/parquet/column_reader.cc | 174 +++++++++++ src/parquet/column_reader.h | 24 ++ src/parquet/column_writer-test.cc | 11 +- src/parquet/column_writer.cc | 168 +++++++++++ src/parquet/column_writer.h | 24 +- src/parquet/encoding-benchmark.cc | 2 - src/parquet/{file => }/file-deserialize-test.cc | 12 +- src/parquet/{file => }/file-serialize-test.cc | 4 +- src/parquet/file/CMakeLists.txt | 27 -- src/parquet/file/reader-internal.cc | 309 ------------------- src/parquet/file/reader-internal.h | 133 --------- src/parquet/file/writer-internal.cc | 327 --------------------- src/parquet/file/writer-internal.h | 153 ---------- src/parquet/file/writer.cc | 119 -------- src/parquet/{file/reader.cc => file_reader.cc} | 179 ++++++++++- src/parquet/{file/reader.h => file_reader.h} | 9 +- src/parquet/file_writer.cc | 323 ++++++++++++++++++++ src/parquet/{file/writer.h => file_writer.h} | 2 +- .../file-metadata-test.cc => metadata-test.cc} | 2 +- src/parquet/{file => }/metadata.cc | 2 +- src/parquet/{file => }/metadata.h | 0 src/parquet/{file => }/printer.cc | 2 +- src/parquet/{file => }/printer.h | 2 +- src/parquet/properties-test.cc | 2 +- src/parquet/reader-test.cc | 7 +- src/parquet/statistics-test.cc | 9 +- src/parquet/test-util.h | 6 +- 36 files changed, 960 insertions(+), 1175 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 8a3558c..4774631 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -375,6 +375,7 @@ enable_testing() # Dependencies ############################################################ +include_directories(${CMAKE_CURRENT_BINARY_DIR}/src) include_directories( ${CMAKE_CURRENT_SOURCE_DIR}/src ) @@ -661,7 +662,7 @@ if (NOT MSVC) endif() # List of thrift output targets -set(THRIFT_OUTPUT_DIR ${CMAKE_CURRENT_SOURCE_DIR}/src/parquet) +set(THRIFT_OUTPUT_DIR ${CMAKE_CURRENT_BINARY_DIR}/src/parquet) set(THRIFT_OUTPUT_FILES "${THRIFT_OUTPUT_DIR}/parquet_types.cpp") set(THRIFT_OUTPUT_FILES ${THRIFT_OUTPUT_FILES} "${THRIFT_OUTPUT_DIR}/parquet_types.h") set(THRIFT_OUTPUT_FILES ${THRIFT_OUTPUT_FILES} "${THRIFT_OUTPUT_DIR}/parquet_constants.cpp") @@ -683,30 +684,23 @@ add_custom_command( # Library config set(LIBPARQUET_SRCS - src/parquet/exception.cc - src/parquet/types.cc - src/parquet/arrow/reader.cc src/parquet/arrow/record_reader.cc src/parquet/arrow/schema.cc src/parquet/arrow/writer.cc - src/parquet/column_reader.cc src/parquet/column_scanner.cc src/parquet/column_writer.cc - - src/parquet/file/metadata.cc - src/parquet/file/printer.cc - src/parquet/file/reader.cc - src/parquet/file/reader-internal.cc - src/parquet/file/writer.cc - src/parquet/file/writer-internal.cc - - src/parquet/schema.cc - src/parquet/statistics.cc - + src/parquet/exception.cc + src/parquet/file_reader.cc + src/parquet/file_writer.cc + src/parquet/metadata.cc src/parquet/parquet_constants.cpp src/parquet/parquet_types.cpp + src/parquet/printer.cc + src/parquet/schema.cc + src/parquet/statistics.cc + src/parquet/types.cc src/parquet/util/comparison.cc src/parquet/util/memory.cc ) @@ -785,7 +779,6 @@ endif() add_subdirectory(src/parquet) add_subdirectory(src/parquet/api) add_subdirectory(src/parquet/arrow) -add_subdirectory(src/parquet/file) add_subdirectory(src/parquet/util) if (NOT MSVC) diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt index a2e283e..bc16d8b 100644 --- a/src/parquet/CMakeLists.txt +++ b/src/parquet/CMakeLists.txt @@ -23,6 +23,10 @@ install(FILES column_writer.h encoding.h exception.h + file_reader.h + file_writer.h + metadata.h + printer.h properties.h schema.h statistics.h @@ -49,9 +53,12 @@ install(FILES ADD_PARQUET_TEST(column_reader-test) ADD_PARQUET_TEST(column_scanner-test) ADD_PARQUET_TEST(column_writer-test) +ADD_PARQUET_TEST(file-deserialize-test) +ADD_PARQUET_TEST(file-serialize-test) ADD_PARQUET_TEST(properties-test) ADD_PARQUET_TEST(statistics-test) ADD_PARQUET_TEST(encoding-test) +ADD_PARQUET_TEST(metadata-test) ADD_PARQUET_TEST(public-api-test) ADD_PARQUET_TEST(types-test) ADD_PARQUET_TEST(reader-test) diff --git a/src/parquet/api/reader.h b/src/parquet/api/reader.h index ba9717a..505654f 100644 --- a/src/parquet/api/reader.h +++ b/src/parquet/api/reader.h @@ -22,11 +22,9 @@ #include "parquet/column_reader.h" #include "parquet/column_scanner.h" #include "parquet/exception.h" -#include "parquet/file/printer.h" -#include "parquet/file/reader.h" - -// Metadata reader API -#include "parquet/file/metadata.h" +#include "parquet/file_reader.h" +#include "parquet/metadata.h" +#include "parquet/printer.h" // Schemas #include "parquet/api/schema.h" diff --git a/src/parquet/api/writer.h b/src/parquet/api/writer.h index cc3ae2a..3b4e42f 100644 --- a/src/parquet/api/writer.h +++ b/src/parquet/api/writer.h @@ -18,15 +18,10 @@ #ifndef PARQUET_API_WRITER_H #define PARQUET_API_WRITER_H -// Column reader API +#include "parquet/api/io.h" +#include "parquet/api/schema.h" #include "parquet/column_writer.h" #include "parquet/exception.h" -#include "parquet/file/writer.h" - -// Schemas -#include "parquet/api/schema.h" - -// IO -#include "parquet/api/io.h" +#include "parquet/file_writer.h" #endif // PARQUET_API_WRITER_H diff --git a/src/parquet/arrow/arrow-reader-writer-benchmark.cc b/src/parquet/arrow/arrow-reader-writer-benchmark.cc index edeef1e..15d2cf7 100644 --- a/src/parquet/arrow/arrow-reader-writer-benchmark.cc +++ b/src/parquet/arrow/arrow-reader-writer-benchmark.cc @@ -23,8 +23,8 @@ #include "parquet/arrow/writer.h" #include "parquet/column_reader.h" #include "parquet/column_writer.h" -#include "parquet/file/reader-internal.h" -#include "parquet/file/writer-internal.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" #include "parquet/util/memory.h" #include "arrow/api.h" @@ -142,7 +142,6 @@ std::shared_ptr<::arrow::Table> TableFromVector(const std::vector static void BM_WriteColumn(::benchmark::State& state) { - format::ColumnChunk thrift_metadata; std::vector values(BENCHMARK_SIZE, 128); std::shared_ptr<::arrow::Table> table = TableFromVector(values, nullable); diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc index a8d3824..c10b164 100644 --- a/src/parquet/arrow/arrow-reader-writer-test.cc +++ b/src/parquet/arrow/arrow-reader-writer-test.cc @@ -34,7 +34,7 @@ #include "parquet/arrow/test-util.h" #include "parquet/arrow/writer.h" -#include "parquet/file/writer.h" +#include "parquet/file_writer.h" #include "arrow/api.h" #include "arrow/test-util.h" diff --git a/src/parquet/arrow/record_reader.h b/src/parquet/arrow/record_reader.h index 8d55f9d..9ca8b68 100644 --- a/src/parquet/arrow/record_reader.h +++ b/src/parquet/arrow/record_reader.h @@ -30,7 +30,7 @@ #include #include -#include "parquet/column_page.h" +#include "parquet/column_reader.h" #include "parquet/schema.h" #include "parquet/util/visibility.h" diff --git a/src/parquet/column-io-benchmark.cc b/src/parquet/column-io-benchmark.cc index ec7b52e..7c8d093 100644 --- a/src/parquet/column-io-benchmark.cc +++ b/src/parquet/column-io-benchmark.cc @@ -19,8 +19,8 @@ #include "parquet/column_reader.h" #include "parquet/column_writer.h" -#include "parquet/file/reader-internal.h" -#include "parquet/file/writer-internal.h" +#include "parquet/file_reader.h" +#include "parquet/parquet_types.h" #include "parquet/util/memory.h" namespace parquet { @@ -33,8 +33,8 @@ std::unique_ptr BuildWriter(int64_t output_size, OutputStream* dst, ColumnChunkMetaDataBuilder* metadata, ColumnDescriptor* schema, const WriterProperties* properties) { - std::unique_ptr pager( - new SerializedPageWriter(dst, Compression::UNCOMPRESSED, metadata)); + std::unique_ptr pager = + PageWriter::Open(dst, Compression::UNCOMPRESSED, metadata); return std::unique_ptr( new Int64Writer(metadata, std::move(pager), Encoding::PLAIN, properties)); } @@ -110,8 +110,8 @@ BENCHMARK_TEMPLATE(BM_WriteInt64Column, Repetition::REPEATED, Compression::ZSTD) std::unique_ptr BuildReader(std::shared_ptr& buffer, int64_t num_values, ColumnDescriptor* schema) { std::unique_ptr source(new InMemoryInputStream(buffer)); - std::unique_ptr page_reader( - new SerializedPageReader(std::move(source), num_values, Compression::UNCOMPRESSED)); + std::unique_ptr page_reader = + PageReader::Open(std::move(source), num_values, Compression::UNCOMPRESSED); return std::unique_ptr(new Int64Reader(schema, std::move(page_reader))); } diff --git a/src/parquet/column_page.h b/src/parquet/column_page.h index 85e3bb5..c34eee7 100644 --- a/src/parquet/column_page.h +++ b/src/parquet/column_page.h @@ -168,35 +168,6 @@ class DictionaryPage : public Page { bool is_sorted_; }; -// Abstract page iterator interface. This way, we can feed column pages to the -// ColumnReader through whatever mechanism we choose -class PageReader { - public: - virtual ~PageReader() {} - - // @returns: shared_ptr(nullptr) on EOS, std::shared_ptr - // containing new Page otherwise - virtual std::shared_ptr NextPage() = 0; -}; - -class PageWriter { - public: - virtual ~PageWriter() {} - - // The Column Writer decides if dictionary encoding is used if set and - // if the dictionary encoding has fallen back to default encoding on reaching dictionary - // page limit - virtual void Close(bool has_dictionary, bool fallback) = 0; - - virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0; - - virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0; - - virtual bool has_compressor() = 0; - - virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0; -}; - } // namespace parquet #endif // PARQUET_COLUMN_PAGE_H diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc index d23e738..91557af 100644 --- a/src/parquet/column_reader.cc +++ b/src/parquet/column_reader.cc @@ -24,11 +24,14 @@ #include #include #include +#include #include #include "parquet/column_page.h" #include "parquet/encoding-internal.h" +#include "parquet/parquet_types.h" #include "parquet/properties.h" +#include "parquet/thrift.h" using arrow::MemoryPool; @@ -89,6 +92,177 @@ ReaderProperties default_reader_properties() { return default_reader_properties; } +// ---------------------------------------------------------------------- +// SerializedPageReader deserializes Thrift metadata and pages that have been +// assembled in a serialized stream for storing in a Parquet files + +// This subclass delimits pages appearing in a serialized stream, each preceded +// by a serialized Thrift format::PageHeader indicating the type of each page +// and the page metadata. +class SerializedPageReader : public PageReader { + public: + SerializedPageReader(std::unique_ptr stream, int64_t total_num_rows, + Compression::type codec, ::arrow::MemoryPool* pool) + : stream_(std::move(stream)), + decompression_buffer_(AllocateBuffer(pool, 0)), + seen_num_rows_(0), + total_num_rows_(total_num_rows) { + max_page_header_size_ = kDefaultMaxPageHeaderSize; + decompressor_ = GetCodecFromArrow(codec); + } + + virtual ~SerializedPageReader() {} + + // Implement the PageReader interface + std::shared_ptr NextPage() override; + + void set_max_page_header_size(uint32_t size) override { max_page_header_size_ = size; } + + private: + std::unique_ptr stream_; + + format::PageHeader current_page_header_; + std::shared_ptr current_page_; + + // Compression codec to use. + std::unique_ptr<::arrow::Codec> decompressor_; + std::shared_ptr decompression_buffer_; + + // Maximum allowed page size + uint32_t max_page_header_size_; + + // Number of rows read in data pages so far + int64_t seen_num_rows_; + + // Number of rows in all the data pages + int64_t total_num_rows_; +}; + +std::shared_ptr SerializedPageReader::NextPage() { + // Loop here because there may be unhandled page types that we skip until + // finding a page that we do know what to do with + while (seen_num_rows_ < total_num_rows_) { + int64_t bytes_read = 0; + int64_t bytes_available = 0; + uint32_t header_size = 0; + const uint8_t* buffer; + uint32_t allowed_page_size = kDefaultPageHeaderSize; + + // Page headers can be very large because of page statistics + // We try to deserialize a larger buffer progressively + // until a maximum allowed header limit + while (true) { + buffer = stream_->Peek(allowed_page_size, &bytes_available); + if (bytes_available == 0) { + return std::shared_ptr(nullptr); + } + + // This gets used, then set by DeserializeThriftMsg + header_size = static_cast(bytes_available); + try { + DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); + break; + } catch (std::exception& e) { + // Failed to deserialize. Double the allowed page header size and try again + std::stringstream ss; + ss << e.what(); + allowed_page_size *= 2; + if (allowed_page_size > max_page_header_size_) { + ss << "Deserializing page header failed.\n"; + throw ParquetException(ss.str()); + } + } + } + // Advance the stream offset + stream_->Advance(header_size); + + int compressed_len = current_page_header_.compressed_page_size; + int uncompressed_len = current_page_header_.uncompressed_page_size; + + // Read the compressed data page. + buffer = stream_->Read(compressed_len, &bytes_read); + if (bytes_read != compressed_len) { + ParquetException::EofException(); + } + + // Uncompress it if we need to + if (decompressor_ != nullptr) { + // Grow the uncompressed buffer if we need to. + if (uncompressed_len > static_cast(decompression_buffer_->size())) { + PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); + } + PARQUET_THROW_NOT_OK( + decompressor_->Decompress(compressed_len, buffer, uncompressed_len, + decompression_buffer_->mutable_data())); + buffer = decompression_buffer_->data(); + } + + auto page_buffer = std::make_shared(buffer, uncompressed_len); + + if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) { + const format::DictionaryPageHeader& dict_header = + current_page_header_.dictionary_page_header; + + bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false; + + return std::make_shared(page_buffer, dict_header.num_values, + FromThrift(dict_header.encoding), + is_sorted); + } else if (current_page_header_.type == format::PageType::DATA_PAGE) { + const format::DataPageHeader& header = current_page_header_.data_page_header; + + EncodedStatistics page_statistics; + if (header.__isset.statistics) { + const format::Statistics& stats = header.statistics; + if (stats.__isset.max) { + page_statistics.set_max(stats.max); + } + if (stats.__isset.min) { + page_statistics.set_min(stats.min); + } + if (stats.__isset.null_count) { + page_statistics.set_null_count(stats.null_count); + } + if (stats.__isset.distinct_count) { + page_statistics.set_distinct_count(stats.distinct_count); + } + } + + seen_num_rows_ += header.num_values; + + return std::make_shared( + page_buffer, header.num_values, FromThrift(header.encoding), + FromThrift(header.definition_level_encoding), + FromThrift(header.repetition_level_encoding), page_statistics); + } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) { + const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; + bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false; + + seen_num_rows_ += header.num_values; + + return std::make_shared( + page_buffer, header.num_values, header.num_nulls, header.num_rows, + FromThrift(header.encoding), header.definition_levels_byte_length, + header.repetition_levels_byte_length, is_compressed); + } else { + // We don't know what this page type is. We're allowed to skip non-data + // pages. + continue; + } + } + return std::shared_ptr(nullptr); +} + +std::unique_ptr PageReader::Open(std::unique_ptr stream, + int64_t total_num_rows, + Compression::type codec, + ::arrow::MemoryPool* pool) { + return std::unique_ptr( + new SerializedPageReader(std::move(stream), total_num_rows, codec, pool)); +} + +// ---------------------------------------------------------------------- + ColumnReader::ColumnReader(const ColumnDescriptor* descr, std::unique_ptr pager, MemoryPool* pool) : descr_(descr), diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h index dcf41e8..6158cb3 100644 --- a/src/parquet/column_reader.h +++ b/src/parquet/column_reader.h @@ -49,6 +49,12 @@ class RleDecoder; namespace parquet { +// 16 MB is the default maximum page header size +static constexpr uint32_t kDefaultMaxPageHeaderSize = 16 * 1024 * 1024; + +// 16 KB is the default expected page header size +static constexpr uint32_t kDefaultPageHeaderSize = 16 * 1024; + namespace BitUtil = ::arrow::BitUtil; class PARQUET_EXPORT LevelDecoder { @@ -72,6 +78,24 @@ class PARQUET_EXPORT LevelDecoder { std::unique_ptr<::arrow::BitReader> bit_packed_decoder_; }; +// Abstract page iterator interface. This way, we can feed column pages to the +// ColumnReader through whatever mechanism we choose +class PARQUET_EXPORT PageReader { + public: + virtual ~PageReader() = default; + + static std::unique_ptr Open( + std::unique_ptr stream, int64_t total_num_rows, + Compression::type codec, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + + // @returns: shared_ptr(nullptr) on EOS, std::shared_ptr + // containing new Page otherwise + virtual std::shared_ptr NextPage() = 0; + + virtual void set_max_page_header_size(uint32_t size) = 0; +}; + class PARQUET_EXPORT ColumnReader { public: ColumnReader(const ColumnDescriptor*, std::unique_ptr, diff --git a/src/parquet/column_writer-test.cc b/src/parquet/column_writer-test.cc index 681f022..b4e3232 100644 --- a/src/parquet/column_writer-test.cc +++ b/src/parquet/column_writer-test.cc @@ -19,8 +19,7 @@ #include "parquet/column_reader.h" #include "parquet/column_writer.h" -#include "parquet/file/reader-internal.h" -#include "parquet/file/writer-internal.h" +#include "parquet/parquet_types.h" #include "parquet/test-specialization.h" #include "parquet/test-util.h" #include "parquet/types.h" @@ -63,8 +62,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { Compression::type compression = Compression::UNCOMPRESSED) { auto buffer = sink_->GetBuffer(); std::unique_ptr source(new InMemoryInputStream(buffer)); - std::unique_ptr page_reader( - new SerializedPageReader(std::move(source), num_rows, compression)); + std::unique_ptr page_reader = + PageReader::Open(std::move(source), num_rows, compression); reader_.reset(new TypedColumnReader(this->descr_, std::move(page_reader))); } @@ -74,8 +73,8 @@ class TestPrimitiveWriter : public PrimitiveTypedTest { sink_.reset(new InMemoryOutputStream()); metadata_ = ColumnChunkMetaDataBuilder::Make( writer_properties_, this->descr_, reinterpret_cast(&thrift_metadata_)); - std::unique_ptr pager( - new SerializedPageWriter(sink_.get(), column_properties.codec, metadata_.get())); + std::unique_ptr pager = + PageWriter::Open(sink_.get(), column_properties.codec, metadata_.get()); WriterProperties::Builder wp_builder; if (column_properties.encoding == Encoding::PLAIN_DICTIONARY || column_properties.encoding == Encoding::RLE_DICTIONARY) { diff --git a/src/parquet/column_writer.cc b/src/parquet/column_writer.cc index 4aadf2b..bdaa9f6 100644 --- a/src/parquet/column_writer.cc +++ b/src/parquet/column_writer.cc @@ -17,12 +17,17 @@ #include "parquet/column_writer.h" +#include +#include + #include "arrow/util/bit-util.h" +#include "arrow/util/compression.h" #include "arrow/util/rle-encoding.h" #include "parquet/encoding-internal.h" #include "parquet/properties.h" #include "parquet/statistics.h" +#include "parquet/thrift.h" #include "parquet/util/logging.h" #include "parquet/util/memory.h" @@ -104,6 +109,169 @@ int LevelEncoder::Encode(int batch_size, const int16_t* levels) { } // ---------------------------------------------------------------------- +// PageWriter implementation + +static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) { + format::Statistics statistics; + if (row_group_statistics.has_min) statistics.__set_min(row_group_statistics.min()); + if (row_group_statistics.has_max) statistics.__set_max(row_group_statistics.max()); + if (row_group_statistics.has_null_count) + statistics.__set_null_count(row_group_statistics.null_count); + if (row_group_statistics.has_distinct_count) + statistics.__set_distinct_count(row_group_statistics.distinct_count); + return statistics; +} + +// This subclass delimits pages appearing in a serialized stream, each preceded +// by a serialized Thrift format::PageHeader indicating the type of each page +// and the page metadata. +class SerializedPageWriter : public PageWriter { + public: + SerializedPageWriter(OutputStream* sink, Compression::type codec, + ColumnChunkMetaDataBuilder* metadata, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()) + : sink_(sink), + metadata_(metadata), + pool_(pool), + num_values_(0), + dictionary_page_offset_(0), + data_page_offset_(0), + total_uncompressed_size_(0), + total_compressed_size_(0) { + compressor_ = GetCodecFromArrow(codec); + } + + virtual ~SerializedPageWriter() = default; + + int64_t WriteDictionaryPage(const DictionaryPage& page) override { + int64_t uncompressed_size = page.size(); + std::shared_ptr compressed_data = nullptr; + if (has_compressor()) { + auto buffer = std::static_pointer_cast( + AllocateBuffer(pool_, uncompressed_size)); + Compress(*(page.buffer().get()), buffer.get()); + compressed_data = std::static_pointer_cast(buffer); + } else { + compressed_data = page.buffer(); + } + + format::DictionaryPageHeader dict_page_header; + dict_page_header.__set_num_values(page.num_values()); + dict_page_header.__set_encoding(ToThrift(page.encoding())); + dict_page_header.__set_is_sorted(page.is_sorted()); + + format::PageHeader page_header; + page_header.__set_type(format::PageType::DICTIONARY_PAGE); + page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); + page_header.__set_compressed_page_size(static_cast(compressed_data->size())); + page_header.__set_dictionary_page_header(dict_page_header); + // TODO(PARQUET-594) crc checksum + + int64_t start_pos = sink_->Tell(); + if (dictionary_page_offset_ == 0) { + dictionary_page_offset_ = start_pos; + } + int64_t header_size = + SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); + sink_->Write(compressed_data->data(), compressed_data->size()); + + total_uncompressed_size_ += uncompressed_size + header_size; + total_compressed_size_ += compressed_data->size() + header_size; + + return sink_->Tell() - start_pos; + } + + void Close(bool has_dictionary, bool fallback) override { + // index_page_offset = 0 since they are not supported + metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_, + total_compressed_size_, total_uncompressed_size_, has_dictionary, + fallback); + + // Write metadata at end of column chunk + metadata_->WriteTo(sink_); + } + + /** + * Compress a buffer. + */ + void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override { + DCHECK(compressor_ != nullptr); + + // Compress the data + int64_t max_compressed_size = + compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data()); + + // Use Arrow::Buffer::shrink_to_fit = false + // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. + PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false)); + + int64_t compressed_size; + PARQUET_THROW_NOT_OK( + compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size, + dest_buffer->mutable_data(), &compressed_size)); + PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false)); + } + + int64_t WriteDataPage(const CompressedDataPage& page) override { + int64_t uncompressed_size = page.uncompressed_size(); + std::shared_ptr compressed_data = page.buffer(); + + format::DataPageHeader data_page_header; + data_page_header.__set_num_values(page.num_values()); + data_page_header.__set_encoding(ToThrift(page.encoding())); + data_page_header.__set_definition_level_encoding( + ToThrift(page.definition_level_encoding())); + data_page_header.__set_repetition_level_encoding( + ToThrift(page.repetition_level_encoding())); + data_page_header.__set_statistics(ToThrift(page.statistics())); + + format::PageHeader page_header; + page_header.__set_type(format::PageType::DATA_PAGE); + page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); + page_header.__set_compressed_page_size(static_cast(compressed_data->size())); + page_header.__set_data_page_header(data_page_header); + // TODO(PARQUET-594) crc checksum + + int64_t start_pos = sink_->Tell(); + if (data_page_offset_ == 0) { + data_page_offset_ = start_pos; + } + + int64_t header_size = + SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); + sink_->Write(compressed_data->data(), compressed_data->size()); + + total_uncompressed_size_ += uncompressed_size + header_size; + total_compressed_size_ += compressed_data->size() + header_size; + num_values_ += page.num_values(); + + return sink_->Tell() - start_pos; + } + + bool has_compressor() override { return (compressor_ != nullptr); } + + private: + OutputStream* sink_; + ColumnChunkMetaDataBuilder* metadata_; + ::arrow::MemoryPool* pool_; + int64_t num_values_; + int64_t dictionary_page_offset_; + int64_t data_page_offset_; + int64_t total_uncompressed_size_; + int64_t total_compressed_size_; + + // Compression codec to use. + std::unique_ptr<::arrow::Codec> compressor_; +}; + +std::unique_ptr PageWriter::Open(OutputStream* sink, Compression::type codec, + ColumnChunkMetaDataBuilder* metadata, + ::arrow::MemoryPool* pool) { + return std::unique_ptr( + new SerializedPageWriter(sink, codec, metadata, pool)); +} + +// ---------------------------------------------------------------------- // ColumnWriter std::shared_ptr default_writer_properties() { diff --git a/src/parquet/column_writer.h b/src/parquet/column_writer.h index 0408747..f1c13a0 100644 --- a/src/parquet/column_writer.h +++ b/src/parquet/column_writer.h @@ -22,7 +22,7 @@ #include "parquet/column_page.h" #include "parquet/encoding.h" -#include "parquet/file/metadata.h" +#include "parquet/metadata.h" #include "parquet/properties.h" #include "parquet/schema.h" #include "parquet/statistics.h" @@ -69,6 +69,28 @@ class PARQUET_EXPORT LevelEncoder { std::unique_ptr<::arrow::BitWriter> bit_packed_encoder_; }; +class PageWriter { + public: + virtual ~PageWriter() {} + + static std::unique_ptr Open( + OutputStream* sink, Compression::type codec, ColumnChunkMetaDataBuilder* metadata, + ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); + + // The Column Writer decides if dictionary encoding is used if set and + // if the dictionary encoding has fallen back to default encoding on reaching dictionary + // page limit + virtual void Close(bool has_dictionary, bool fallback) = 0; + + virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0; + + virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0; + + virtual bool has_compressor() = 0; + + virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0; +}; + static constexpr int WRITE_BATCH_SIZE = 1000; class PARQUET_EXPORT ColumnWriter { public: diff --git a/src/parquet/encoding-benchmark.cc b/src/parquet/encoding-benchmark.cc index 97eeefa..72c41e5 100644 --- a/src/parquet/encoding-benchmark.cc +++ b/src/parquet/encoding-benchmark.cc @@ -18,7 +18,6 @@ #include "benchmark/benchmark.h" #include "parquet/encoding-internal.h" -#include "parquet/file/reader-internal.h" #include "parquet/util/memory.h" using arrow::default_memory_pool; @@ -26,7 +25,6 @@ using arrow::MemoryPool; namespace parquet { -using format::ColumnChunk; using schema::PrimitiveNode; namespace benchmark { diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file-deserialize-test.cc similarity index 96% rename from src/parquet/file/file-deserialize-test.cc rename to src/parquet/file-deserialize-test.cc index 0cab75f..5e17375 100644 --- a/src/parquet/file/file-deserialize-test.cc +++ b/src/parquet/file-deserialize-test.cc @@ -26,9 +26,9 @@ #include #include -#include "parquet/column_page.h" +#include "parquet/column_reader.h" #include "parquet/exception.h" -#include "parquet/file/reader-internal.h" +#include "parquet/file_reader.h" #include "parquet/parquet_types.h" #include "parquet/thrift.h" #include "parquet/types.h" @@ -73,7 +73,7 @@ class TestPageSerde : public ::testing::Test { EndStream(); std::unique_ptr stream; stream.reset(new InMemoryInputStream(out_buffer_)); - page_reader_.reset(new SerializedPageReader(std::move(stream), num_rows, codec)); + page_reader_ = PageReader::Open(std::move(stream), num_rows, codec); } void WriteDataPageHeader(int max_serialized_len = 1024, int32_t uncompressed_size = 0, @@ -99,7 +99,7 @@ class TestPageSerde : public ::testing::Test { std::unique_ptr out_stream_; std::shared_ptr out_buffer_; - std::unique_ptr page_reader_; + std::unique_ptr page_reader_; format::PageHeader page_header_; format::DataPageHeader data_page_header_; }; @@ -149,7 +149,7 @@ TEST_F(TestPageSerde, TestLargePageHeaders) { // check header size is between 256 KB to 16 MB ASSERT_LE(stats_size, out_stream_->Tell()); - ASSERT_GE(DEFAULT_MAX_PAGE_HEADER_SIZE, out_stream_->Tell()); + ASSERT_GE(kDefaultMaxPageHeaderSize, out_stream_->Tell()); InitSerializedPageReader(num_rows); std::shared_ptr current_page = page_reader_->NextPage(); @@ -249,7 +249,7 @@ class TestParquetFileReader : public ::testing::Test { auto reader = std::make_shared(buffer); auto wrapper = std::unique_ptr(new ArrowInputFile(reader)); - ASSERT_THROW(reader_->Open(SerializedFile::Open(std::move(wrapper))), + ASSERT_THROW(reader_->Open(ParquetFileReader::Contents::Open(std::move(wrapper))), ParquetException); } diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file-serialize-test.cc similarity index 99% rename from src/parquet/file/file-serialize-test.cc rename to src/parquet/file-serialize-test.cc index 4d94d2e..b4df77e 100644 --- a/src/parquet/file/file-serialize-test.cc +++ b/src/parquet/file-serialize-test.cc @@ -19,8 +19,8 @@ #include "parquet/column_reader.h" #include "parquet/column_writer.h" -#include "parquet/file/reader.h" -#include "parquet/file/writer.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" #include "parquet/test-specialization.h" #include "parquet/test-util.h" #include "parquet/types.h" diff --git a/src/parquet/file/CMakeLists.txt b/src/parquet/file/CMakeLists.txt deleted file mode 100644 index 82e7c80..0000000 --- a/src/parquet/file/CMakeLists.txt +++ /dev/null @@ -1,27 +0,0 @@ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, -# software distributed under the License is distributed on an -# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -# KIND, either express or implied. See the License for the -# specific language governing permissions and limitations -# under the License. - -install(FILES - metadata.h - printer.h - reader.h - writer.h - DESTINATION "${CMAKE_INSTALL_INCLUDEDIR}/parquet/file") - -ADD_PARQUET_TEST(file-deserialize-test) -ADD_PARQUET_TEST(file-metadata-test) -ADD_PARQUET_TEST(file-serialize-test) diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc deleted file mode 100644 index bc14ec9..0000000 --- a/src/parquet/file/reader-internal.cc +++ /dev/null @@ -1,309 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/file/reader-internal.h" - -#include -#include -#include -#include -#include -#include - -#include "arrow/util/compression.h" - -#include "parquet/column_page.h" -#include "parquet/exception.h" -#include "parquet/schema.h" -#include "parquet/thrift.h" -#include "parquet/types.h" -#include "parquet/util/memory.h" - -using arrow::MemoryPool; - -namespace parquet { - -// ---------------------------------------------------------------------- -// SerializedPageReader deserializes Thrift metadata and pages that have been -// assembled in a serialized stream for storing in a Parquet files - -SerializedPageReader::SerializedPageReader(std::unique_ptr stream, - int64_t total_num_rows, - Compression::type codec, MemoryPool* pool) - : stream_(std::move(stream)), - decompression_buffer_(AllocateBuffer(pool, 0)), - seen_num_rows_(0), - total_num_rows_(total_num_rows) { - max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE; - decompressor_ = GetCodecFromArrow(codec); -} - -std::shared_ptr SerializedPageReader::NextPage() { - // Loop here because there may be unhandled page types that we skip until - // finding a page that we do know what to do with - while (seen_num_rows_ < total_num_rows_) { - int64_t bytes_read = 0; - int64_t bytes_available = 0; - uint32_t header_size = 0; - const uint8_t* buffer; - uint32_t allowed_page_size = DEFAULT_PAGE_HEADER_SIZE; - - // Page headers can be very large because of page statistics - // We try to deserialize a larger buffer progressively - // until a maximum allowed header limit - while (true) { - buffer = stream_->Peek(allowed_page_size, &bytes_available); - if (bytes_available == 0) { - return std::shared_ptr(nullptr); - } - - // This gets used, then set by DeserializeThriftMsg - header_size = static_cast(bytes_available); - try { - DeserializeThriftMsg(buffer, &header_size, ¤t_page_header_); - break; - } catch (std::exception& e) { - // Failed to deserialize. Double the allowed page header size and try again - std::stringstream ss; - ss << e.what(); - allowed_page_size *= 2; - if (allowed_page_size > max_page_header_size_) { - ss << "Deserializing page header failed.\n"; - throw ParquetException(ss.str()); - } - } - } - // Advance the stream offset - stream_->Advance(header_size); - - int compressed_len = current_page_header_.compressed_page_size; - int uncompressed_len = current_page_header_.uncompressed_page_size; - - // Read the compressed data page. - buffer = stream_->Read(compressed_len, &bytes_read); - if (bytes_read != compressed_len) { - ParquetException::EofException(); - } - - // Uncompress it if we need to - if (decompressor_ != nullptr) { - // Grow the uncompressed buffer if we need to. - if (uncompressed_len > static_cast(decompression_buffer_->size())) { - PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len, false)); - } - PARQUET_THROW_NOT_OK( - decompressor_->Decompress(compressed_len, buffer, uncompressed_len, - decompression_buffer_->mutable_data())); - buffer = decompression_buffer_->data(); - } - - auto page_buffer = std::make_shared(buffer, uncompressed_len); - - if (current_page_header_.type == format::PageType::DICTIONARY_PAGE) { - const format::DictionaryPageHeader& dict_header = - current_page_header_.dictionary_page_header; - - bool is_sorted = dict_header.__isset.is_sorted ? dict_header.is_sorted : false; - - return std::make_shared(page_buffer, dict_header.num_values, - FromThrift(dict_header.encoding), - is_sorted); - } else if (current_page_header_.type == format::PageType::DATA_PAGE) { - const format::DataPageHeader& header = current_page_header_.data_page_header; - - EncodedStatistics page_statistics; - if (header.__isset.statistics) { - const format::Statistics& stats = header.statistics; - if (stats.__isset.max) { - page_statistics.set_max(stats.max); - } - if (stats.__isset.min) { - page_statistics.set_min(stats.min); - } - if (stats.__isset.null_count) { - page_statistics.set_null_count(stats.null_count); - } - if (stats.__isset.distinct_count) { - page_statistics.set_distinct_count(stats.distinct_count); - } - } - - seen_num_rows_ += header.num_values; - - return std::make_shared( - page_buffer, header.num_values, FromThrift(header.encoding), - FromThrift(header.definition_level_encoding), - FromThrift(header.repetition_level_encoding), page_statistics); - } else if (current_page_header_.type == format::PageType::DATA_PAGE_V2) { - const format::DataPageHeaderV2& header = current_page_header_.data_page_header_v2; - bool is_compressed = header.__isset.is_compressed ? header.is_compressed : false; - - seen_num_rows_ += header.num_values; - - return std::make_shared( - page_buffer, header.num_values, header.num_nulls, header.num_rows, - FromThrift(header.encoding), header.definition_levels_byte_length, - header.repetition_levels_byte_length, is_compressed); - } else { - // We don't know what this page type is. We're allowed to skip non-data - // pages. - continue; - } - } - return std::shared_ptr(nullptr); -} - -SerializedRowGroup::SerializedRowGroup(RandomAccessSource* source, - FileMetaData* file_metadata, int row_group_number, - const ReaderProperties& props) - : source_(source), file_metadata_(file_metadata), properties_(props) { - row_group_metadata_ = file_metadata->RowGroup(row_group_number); -} -const RowGroupMetaData* SerializedRowGroup::metadata() const { - return row_group_metadata_.get(); -} - -const ReaderProperties* SerializedRowGroup::properties() const { return &properties_; } - -// For PARQUET-816 -static constexpr int64_t kMaxDictHeaderSize = 100; - -std::unique_ptr SerializedRowGroup::GetColumnPageReader(int i) { - // Read column chunk from the file - auto col = row_group_metadata_->ColumnChunk(i); - - int64_t col_start = col->data_page_offset(); - if (col->has_dictionary_page() && col_start > col->dictionary_page_offset()) { - col_start = col->dictionary_page_offset(); - } - - int64_t col_length = col->total_compressed_size(); - std::unique_ptr stream; - - // PARQUET-816 workaround for old files created by older parquet-mr - const ApplicationVersion& version = file_metadata_->writer_version(); - if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION)) { - // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the - // dictionary page header size in total_compressed_size and total_uncompressed_size - // (see IMPALA-694). We add padding to compensate. - int64_t bytes_remaining = source_->Size() - (col_start + col_length); - int64_t padding = std::min(kMaxDictHeaderSize, bytes_remaining); - col_length += padding; - } - - stream = properties_.GetStream(source_, col_start, col_length); - - return std::unique_ptr( - new SerializedPageReader(std::move(stream), col->num_values(), col->compression(), - properties_.memory_pool())); -} - -// ---------------------------------------------------------------------- -// SerializedFile: Parquet on-disk layout - -// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file -static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024; -static constexpr uint32_t FOOTER_SIZE = 8; -static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; - -std::unique_ptr SerializedFile::Open( - std::unique_ptr source, const ReaderProperties& props, - const std::shared_ptr& metadata) { - std::unique_ptr result( - new SerializedFile(std::move(source), props)); - - // Access private methods here, but otherwise unavailable - SerializedFile* file = static_cast(result.get()); - - if (metadata == nullptr) { - // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor - file->ParseMetaData(); - } else { - file->file_metadata_ = metadata; - } - - return result; -} - -void SerializedFile::Close() { source_->Close(); } - -SerializedFile::~SerializedFile() { - try { - Close(); - } catch (...) { - } -} - -std::shared_ptr SerializedFile::GetRowGroup(int i) { - std::unique_ptr contents( - new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_)); - return std::make_shared(std::move(contents)); -} - -std::shared_ptr SerializedFile::metadata() const { return file_metadata_; } - -SerializedFile::SerializedFile( - std::unique_ptr source, - const ReaderProperties& props = default_reader_properties()) - : source_(std::move(source)), properties_(props) {} - -void SerializedFile::ParseMetaData() { - int64_t file_size = source_->Size(); - - if (file_size < FOOTER_SIZE) { - throw ParquetException("Corrupted file, smaller than file footer"); - } - - uint8_t footer_buffer[DEFAULT_FOOTER_READ_SIZE]; - int64_t footer_read_size = std::min(file_size, DEFAULT_FOOTER_READ_SIZE); - int64_t bytes_read = - source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer); - - // Check if all bytes are read. Check if last 4 bytes read have the magic bits - if (bytes_read != footer_read_size || - memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) { - throw ParquetException("Invalid parquet file. Corrupt footer."); - } - - uint32_t metadata_len = - *reinterpret_cast(footer_buffer + footer_read_size - FOOTER_SIZE); - int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len; - if (FOOTER_SIZE + metadata_len > file_size) { - throw ParquetException( - "Invalid parquet file. File is less than " - "file metadata size."); - } - - std::shared_ptr metadata_buffer = - AllocateBuffer(properties_.memory_pool(), metadata_len); - - // Check if the footer_buffer contains the entire metadata - if (footer_read_size >= (metadata_len + FOOTER_SIZE)) { - memcpy(metadata_buffer->mutable_data(), - footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE), metadata_len); - } else { - bytes_read = - source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data()); - if (bytes_read != metadata_len) { - throw ParquetException("Invalid parquet file. Could not read metadata bytes."); - } - } - - file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len); -} - -} // namespace parquet diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h deleted file mode 100644 index 282c534..0000000 --- a/src/parquet/file/reader-internal.h +++ /dev/null @@ -1,133 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_FILE_READER_INTERNAL_H -#define PARQUET_FILE_READER_INTERNAL_H - -#include -#include -#include - -#include "parquet/column_page.h" -#include "parquet/file/metadata.h" -#include "parquet/file/reader.h" -#include "parquet/parquet_types.h" -#include "parquet/properties.h" -#include "parquet/types.h" -#include "parquet/util/memory.h" -#include "parquet/util/visibility.h" - -namespace arrow { - -class Codec; -}; - -namespace parquet { - -// 16 MB is the default maximum page header size -static constexpr uint32_t DEFAULT_MAX_PAGE_HEADER_SIZE = 16 * 1024 * 1024; - -// 16 KB is the default expected page header size -static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024; - -// This subclass delimits pages appearing in a serialized stream, each preceded -// by a serialized Thrift format::PageHeader indicating the type of each page -// and the page metadata. -class PARQUET_EXPORT SerializedPageReader : public PageReader { - public: - SerializedPageReader(std::unique_ptr stream, int64_t num_rows, - Compression::type codec, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); - - virtual ~SerializedPageReader() {} - - // Implement the PageReader interface - virtual std::shared_ptr NextPage(); - - void set_max_page_header_size(uint32_t size) { max_page_header_size_ = size; } - - private: - std::unique_ptr stream_; - - format::PageHeader current_page_header_; - std::shared_ptr current_page_; - - // Compression codec to use. - std::unique_ptr<::arrow::Codec> decompressor_; - std::shared_ptr decompression_buffer_; - - // Maximum allowed page size - uint32_t max_page_header_size_; - - // Number of rows read in data pages so far - int64_t seen_num_rows_; - - // Number of rows in all the data pages - int64_t total_num_rows_; -}; - -// RowGroupReader::Contents implementation for the Parquet file specification -class PARQUET_EXPORT SerializedRowGroup : public RowGroupReader::Contents { - public: - SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata, - int row_group_number, const ReaderProperties& props); - - virtual const RowGroupMetaData* metadata() const; - - virtual const ReaderProperties* properties() const; - - virtual std::unique_ptr GetColumnPageReader(int i); - - private: - RandomAccessSource* source_; - FileMetaData* file_metadata_; - std::unique_ptr row_group_metadata_; - ReaderProperties properties_; -}; - -// An implementation of ParquetFileReader::Contents that deals with the Parquet -// file structure, Thrift deserialization, and other internal matters - -class PARQUET_EXPORT SerializedFile : public ParquetFileReader::Contents { - public: - // Open the file. If no metadata is passed, it is parsed from the footer of - // the file - static std::unique_ptr Open( - std::unique_ptr source, - const ReaderProperties& props = default_reader_properties(), - const std::shared_ptr& metadata = nullptr); - - void Close() override; - std::shared_ptr GetRowGroup(int i) override; - std::shared_ptr metadata() const override; - virtual ~SerializedFile(); - - private: - // This class takes ownership of the provided data source - explicit SerializedFile(std::unique_ptr source, - const ReaderProperties& props); - - std::unique_ptr source_; - std::shared_ptr file_metadata_; - ReaderProperties properties_; - - void ParseMetaData(); -}; - -} // namespace parquet - -#endif // PARQUET_FILE_READER_INTERNAL_H diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc deleted file mode 100644 index 712289b..0000000 --- a/src/parquet/file/writer-internal.cc +++ /dev/null @@ -1,327 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/file/writer-internal.h" - -#include -#include - -#include "arrow/util/compression.h" - -#include "parquet/column_writer.h" -#include "parquet/schema-internal.h" -#include "parquet/schema.h" -#include "parquet/thrift.h" -#include "parquet/util/memory.h" - -using arrow::MemoryPool; - -using parquet::schema::GroupNode; -using parquet::schema::SchemaFlattener; - -namespace parquet { - -// FIXME: copied from reader-internal.cc -static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; - -// ---------------------------------------------------------------------- -// SerializedPageWriter - -SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type codec, - ColumnChunkMetaDataBuilder* metadata, - MemoryPool* pool) - : sink_(sink), - metadata_(metadata), - pool_(pool), - num_values_(0), - dictionary_page_offset_(0), - data_page_offset_(0), - total_uncompressed_size_(0), - total_compressed_size_(0) { - compressor_ = GetCodecFromArrow(codec); -} - -static format::Statistics ToThrift(const EncodedStatistics& row_group_statistics) { - format::Statistics statistics; - if (row_group_statistics.has_min) statistics.__set_min(row_group_statistics.min()); - if (row_group_statistics.has_max) statistics.__set_max(row_group_statistics.max()); - if (row_group_statistics.has_null_count) - statistics.__set_null_count(row_group_statistics.null_count); - if (row_group_statistics.has_distinct_count) - statistics.__set_distinct_count(row_group_statistics.distinct_count); - return statistics; -} - -void SerializedPageWriter::Close(bool has_dictionary, bool fallback) { - // index_page_offset = 0 since they are not supported - metadata_->Finish(num_values_, dictionary_page_offset_, 0, data_page_offset_, - total_compressed_size_, total_uncompressed_size_, has_dictionary, - fallback); - - // Write metadata at end of column chunk - metadata_->WriteTo(sink_); -} - -void SerializedPageWriter::Compress(const Buffer& src_buffer, - ResizableBuffer* dest_buffer) { - DCHECK(compressor_ != nullptr); - - // Compress the data - int64_t max_compressed_size = - compressor_->MaxCompressedLen(src_buffer.size(), src_buffer.data()); - - // Use Arrow::Buffer::shrink_to_fit = false - // underlying buffer only keeps growing. Resize to a smaller size does not reallocate. - PARQUET_THROW_NOT_OK(dest_buffer->Resize(max_compressed_size, false)); - - int64_t compressed_size; - PARQUET_THROW_NOT_OK( - compressor_->Compress(src_buffer.size(), src_buffer.data(), max_compressed_size, - dest_buffer->mutable_data(), &compressed_size)); - PARQUET_THROW_NOT_OK(dest_buffer->Resize(compressed_size, false)); -} - -int64_t SerializedPageWriter::WriteDataPage(const CompressedDataPage& page) { - int64_t uncompressed_size = page.uncompressed_size(); - std::shared_ptr compressed_data = page.buffer(); - - format::DataPageHeader data_page_header; - data_page_header.__set_num_values(page.num_values()); - data_page_header.__set_encoding(ToThrift(page.encoding())); - data_page_header.__set_definition_level_encoding( - ToThrift(page.definition_level_encoding())); - data_page_header.__set_repetition_level_encoding( - ToThrift(page.repetition_level_encoding())); - data_page_header.__set_statistics(ToThrift(page.statistics())); - - format::PageHeader page_header; - page_header.__set_type(format::PageType::DATA_PAGE); - page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); - page_header.__set_compressed_page_size(static_cast(compressed_data->size())); - page_header.__set_data_page_header(data_page_header); - // TODO(PARQUET-594) crc checksum - - int64_t start_pos = sink_->Tell(); - if (data_page_offset_ == 0) { - data_page_offset_ = start_pos; - } - - int64_t header_size = - SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); - sink_->Write(compressed_data->data(), compressed_data->size()); - - total_uncompressed_size_ += uncompressed_size + header_size; - total_compressed_size_ += compressed_data->size() + header_size; - num_values_ += page.num_values(); - - return sink_->Tell() - start_pos; -} - -int64_t SerializedPageWriter::WriteDictionaryPage(const DictionaryPage& page) { - int64_t uncompressed_size = page.size(); - std::shared_ptr compressed_data = nullptr; - if (has_compressor()) { - auto buffer = std::static_pointer_cast( - AllocateBuffer(pool_, uncompressed_size)); - Compress(*(page.buffer().get()), buffer.get()); - compressed_data = std::static_pointer_cast(buffer); - } else { - compressed_data = page.buffer(); - } - - format::DictionaryPageHeader dict_page_header; - dict_page_header.__set_num_values(page.num_values()); - dict_page_header.__set_encoding(ToThrift(page.encoding())); - dict_page_header.__set_is_sorted(page.is_sorted()); - - format::PageHeader page_header; - page_header.__set_type(format::PageType::DICTIONARY_PAGE); - page_header.__set_uncompressed_page_size(static_cast(uncompressed_size)); - page_header.__set_compressed_page_size(static_cast(compressed_data->size())); - page_header.__set_dictionary_page_header(dict_page_header); - // TODO(PARQUET-594) crc checksum - - int64_t start_pos = sink_->Tell(); - if (dictionary_page_offset_ == 0) { - dictionary_page_offset_ = start_pos; - } - int64_t header_size = - SerializeThriftMsg(&page_header, sizeof(format::PageHeader), sink_); - sink_->Write(compressed_data->data(), compressed_data->size()); - - total_uncompressed_size_ += uncompressed_size + header_size; - total_compressed_size_ += compressed_data->size() + header_size; - - return sink_->Tell() - start_pos; -} - -// ---------------------------------------------------------------------- -// RowGroupSerializer - -int RowGroupSerializer::num_columns() const { return metadata_->num_columns(); } - -int64_t RowGroupSerializer::num_rows() const { - if (current_column_writer_) { - CheckRowsWritten(); - } - return num_rows_ < 0 ? 0 : num_rows_; -} - -void RowGroupSerializer::CheckRowsWritten() const { - int64_t current_rows = current_column_writer_->rows_written(); - if (num_rows_ < 0) { - num_rows_ = current_rows; - metadata_->set_num_rows(current_rows); - } else if (num_rows_ != current_rows) { - std::stringstream ss; - ss << "Column " << current_column_index_ << " had " << current_rows - << " while previous column had " << num_rows_; - throw ParquetException(ss.str()); - } -} - -ColumnWriter* RowGroupSerializer::NextColumn() { - if (current_column_writer_) { - CheckRowsWritten(); - } - - // Throws an error if more columns are being written - auto col_meta = metadata_->NextColumnChunk(); - - if (current_column_writer_) { - total_bytes_written_ += current_column_writer_->Close(); - } - - ++current_column_index_; - - const ColumnDescriptor* column_descr = col_meta->descr(); - std::unique_ptr pager( - new SerializedPageWriter(sink_, properties_->compression(column_descr->path()), - col_meta, properties_->memory_pool())); - current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_); - return current_column_writer_.get(); -} - -int RowGroupSerializer::current_column() const { return metadata_->current_column(); } - -void RowGroupSerializer::Close() { - if (!closed_) { - closed_ = true; - - if (current_column_writer_) { - CheckRowsWritten(); - total_bytes_written_ += current_column_writer_->Close(); - current_column_writer_.reset(); - } - - // Ensures all columns have been written - metadata_->Finish(total_bytes_written_); - } -} - -// ---------------------------------------------------------------------- -// FileSerializer - -std::unique_ptr FileSerializer::Open( - const std::shared_ptr& sink, const std::shared_ptr& schema, - const std::shared_ptr& properties, - const std::shared_ptr& key_value_metadata) { - std::unique_ptr result( - new FileSerializer(sink, schema, properties, key_value_metadata)); - - return result; -} - -void FileSerializer::Close() { - if (is_open_) { - if (row_group_writer_) { - num_rows_ += row_group_writer_->num_rows(); - row_group_writer_->Close(); - } - row_group_writer_.reset(); - - // Write magic bytes and metadata - WriteMetaData(); - - sink_->Close(); - is_open_ = false; - } -} - -int FileSerializer::num_columns() const { return schema_.num_columns(); } - -int FileSerializer::num_row_groups() const { return num_row_groups_; } - -int64_t FileSerializer::num_rows() const { return num_rows_; } - -const std::shared_ptr& FileSerializer::properties() const { - return properties_; -} - -RowGroupWriter* FileSerializer::AppendRowGroup() { - if (row_group_writer_) { - row_group_writer_->Close(); - } - num_row_groups_++; - auto rg_metadata = metadata_->AppendRowGroup(); - std::unique_ptr contents( - new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get())); - row_group_writer_.reset(new RowGroupWriter(std::move(contents))); - return row_group_writer_.get(); -} - -FileSerializer::~FileSerializer() { - try { - Close(); - } catch (...) { - } -} - -void FileSerializer::WriteMetaData() { - // Write MetaData - uint32_t metadata_len = static_cast(sink_->Tell()); - - // Get a FileMetaData - auto metadata = metadata_->Finish(); - metadata->WriteTo(sink_.get()); - metadata_len = static_cast(sink_->Tell()) - metadata_len; - - // Write Footer - sink_->Write(reinterpret_cast(&metadata_len), 4); - sink_->Write(PARQUET_MAGIC, 4); -} - -FileSerializer::FileSerializer( - const std::shared_ptr& sink, const std::shared_ptr& schema, - const std::shared_ptr& properties, - const std::shared_ptr& key_value_metadata) - : ParquetFileWriter::Contents(schema, key_value_metadata), - sink_(sink), - is_open_(true), - properties_(properties), - num_row_groups_(0), - num_rows_(0), - metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) { - StartFile(); -} - -void FileSerializer::StartFile() { - // Parquet files always start with PAR1 - sink_->Write(PARQUET_MAGIC, 4); -} - -} // namespace parquet diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h deleted file mode 100644 index 3cd73fe..0000000 --- a/src/parquet/file/writer-internal.h +++ /dev/null @@ -1,153 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_FILE_WRITER_INTERNAL_H -#define PARQUET_FILE_WRITER_INTERNAL_H - -#include -#include - -#include "parquet/column_page.h" -#include "parquet/file/metadata.h" -#include "parquet/file/writer.h" -#include "parquet/parquet_types.h" -#include "parquet/util/memory.h" - -namespace arrow { - -class Codec; -}; - -namespace parquet { - -// This subclass delimits pages appearing in a serialized stream, each preceded -// by a serialized Thrift format::PageHeader indicating the type of each page -// and the page metadata. -class SerializedPageWriter : public PageWriter { - public: - SerializedPageWriter(OutputStream* sink, Compression::type codec, - ColumnChunkMetaDataBuilder* metadata, - ::arrow::MemoryPool* pool = ::arrow::default_memory_pool()); - - virtual ~SerializedPageWriter() {} - - int64_t WriteDataPage(const CompressedDataPage& page) override; - - int64_t WriteDictionaryPage(const DictionaryPage& page) override; - - /** - * Compress a buffer. - */ - void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) override; - - bool has_compressor() override { return (compressor_ != nullptr); } - - void Close(bool has_dictionary, bool fallback) override; - - private: - OutputStream* sink_; - ColumnChunkMetaDataBuilder* metadata_; - ::arrow::MemoryPool* pool_; - int64_t num_values_; - int64_t dictionary_page_offset_; - int64_t data_page_offset_; - int64_t total_uncompressed_size_; - int64_t total_compressed_size_; - - // Compression codec to use. - std::unique_ptr<::arrow::Codec> compressor_; -}; - -// RowGroupWriter::Contents implementation for the Parquet file specification -class RowGroupSerializer : public RowGroupWriter::Contents { - public: - RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata, - const WriterProperties* properties) - : sink_(sink), - metadata_(metadata), - properties_(properties), - total_bytes_written_(0), - closed_(false), - current_column_index_(0), - num_rows_(-1) {} - - int num_columns() const override; - int64_t num_rows() const override; - - ColumnWriter* NextColumn() override; - int current_column() const override; - void Close() override; - - private: - OutputStream* sink_; - mutable RowGroupMetaDataBuilder* metadata_; - const WriterProperties* properties_; - int64_t total_bytes_written_; - bool closed_; - int current_column_index_; - mutable int64_t num_rows_; - - void CheckRowsWritten() const; - - std::shared_ptr current_column_writer_; -}; - -// An implementation of ParquetFileWriter::Contents that deals with the Parquet -// file structure, Thrift serialization, and other internal matters - -class FileSerializer : public ParquetFileWriter::Contents { - public: - static std::unique_ptr Open( - const std::shared_ptr& sink, - const std::shared_ptr& schema, - const std::shared_ptr& properties = default_writer_properties(), - const std::shared_ptr& key_value_metadata = nullptr); - - void Close() override; - - RowGroupWriter* AppendRowGroup() override; - - const std::shared_ptr& properties() const override; - - int num_columns() const override; - int num_row_groups() const override; - int64_t num_rows() const override; - - virtual ~FileSerializer(); - - private: - explicit FileSerializer( - const std::shared_ptr& sink, - const std::shared_ptr& schema, - const std::shared_ptr& properties, - const std::shared_ptr& key_value_metadata); - - std::shared_ptr sink_; - bool is_open_; - const std::shared_ptr properties_; - int num_row_groups_; - int64_t num_rows_; - std::unique_ptr metadata_; - std::unique_ptr row_group_writer_; - - void StartFile(); - void WriteMetaData(); -}; - -} // namespace parquet - -#endif // PARQUET_FILE_WRITER_INTERNAL_H diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc deleted file mode 100644 index a91553b..0000000 --- a/src/parquet/file/writer.cc +++ /dev/null @@ -1,119 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/file/writer.h" - -#include "parquet/file/writer-internal.h" -#include "parquet/util/memory.h" - -using parquet::schema::GroupNode; - -namespace parquet { - -// ---------------------------------------------------------------------- -// RowGroupWriter public API - -RowGroupWriter::RowGroupWriter(std::unique_ptr contents) - : contents_(std::move(contents)) {} - -void RowGroupWriter::Close() { - if (contents_) { - contents_->Close(); - } -} - -ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); } - -int RowGroupWriter::current_column() { return contents_->current_column(); } - -int RowGroupWriter::num_columns() const { return contents_->num_columns(); } - -int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); } - -// ---------------------------------------------------------------------- -// ParquetFileWriter public API - -ParquetFileWriter::ParquetFileWriter() {} - -ParquetFileWriter::~ParquetFileWriter() { - try { - Close(); - } catch (...) { - } -} - -std::unique_ptr ParquetFileWriter::Open( - const std::shared_ptr<::arrow::io::OutputStream>& sink, - const std::shared_ptr& schema, - const std::shared_ptr& properties, - const std::shared_ptr& key_value_metadata) { - return Open(std::make_shared(sink), schema, properties, - key_value_metadata); -} - -std::unique_ptr ParquetFileWriter::Open( - const std::shared_ptr& sink, - const std::shared_ptr& schema, - const std::shared_ptr& properties, - const std::shared_ptr& key_value_metadata) { - auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata); - std::unique_ptr result(new ParquetFileWriter()); - result->Open(std::move(contents)); - return result; -} - -const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); } - -const ColumnDescriptor* ParquetFileWriter::descr(int i) const { - return contents_->schema()->Column(i); -} - -int ParquetFileWriter::num_columns() const { return contents_->num_columns(); } - -int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); } - -int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); } - -const std::shared_ptr& ParquetFileWriter::key_value_metadata() - const { - return contents_->key_value_metadata(); -} - -void ParquetFileWriter::Open(std::unique_ptr contents) { - contents_ = std::move(contents); -} - -void ParquetFileWriter::Close() { - if (contents_) { - contents_->Close(); - contents_.reset(); - } -} - -RowGroupWriter* ParquetFileWriter::AppendRowGroup() { - return contents_->AppendRowGroup(); -} - -RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) { - return AppendRowGroup(); -} - -const std::shared_ptr& ParquetFileWriter::properties() const { - return contents_->properties(); -} - -} // namespace parquet diff --git a/src/parquet/file/reader.cc b/src/parquet/file_reader.cc similarity index 50% rename from src/parquet/file/reader.cc rename to src/parquet/file_reader.cc index 9b9bde9..72c71c6 100644 --- a/src/parquet/file/reader.cc +++ b/src/parquet/file_reader.cc @@ -15,13 +15,16 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/file/reader.h" +#include "parquet/file_reader.h" +#include +#include #include #include #include #include #include +#include #include "arrow/io/file.h" @@ -29,15 +32,31 @@ #include "parquet/column_reader.h" #include "parquet/column_scanner.h" #include "parquet/exception.h" -#include "parquet/file/reader-internal.h" +#include "parquet/metadata.h" +#include "parquet/parquet_types.h" +#include "parquet/properties.h" #include "parquet/types.h" #include "parquet/util/logging.h" #include "parquet/util/memory.h" using std::string; +namespace arrow { + +class Codec; + +} // namespace arrow + namespace parquet { +// PARQUET-978: Minimize footer reads by reading 64 KB from the end of the file +static constexpr int64_t DEFAULT_FOOTER_READ_SIZE = 64 * 1024; +static constexpr uint32_t FOOTER_SIZE = 8; +static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; + +// For PARQUET-816 +static constexpr int64_t kMaxDictHeaderSize = 100; + // ---------------------------------------------------------------------- // RowGroupReader public API @@ -66,10 +85,145 @@ std::unique_ptr RowGroupReader::GetColumnPageReader(int i) { // Returns the rowgroup metadata const RowGroupMetaData* RowGroupReader::metadata() const { return contents_->metadata(); } +// RowGroupReader::Contents implementation for the Parquet file specification +class SerializedRowGroup : public RowGroupReader::Contents { + public: + SerializedRowGroup(RandomAccessSource* source, FileMetaData* file_metadata, + int row_group_number, const ReaderProperties& props) + : source_(source), file_metadata_(file_metadata), properties_(props) { + row_group_metadata_ = file_metadata->RowGroup(row_group_number); + } + + const RowGroupMetaData* metadata() const override { return row_group_metadata_.get(); } + + const ReaderProperties* properties() const override { return &properties_; } + + std::unique_ptr GetColumnPageReader(int i) override { + // Read column chunk from the file + auto col = row_group_metadata_->ColumnChunk(i); + + int64_t col_start = col->data_page_offset(); + if (col->has_dictionary_page() && col_start > col->dictionary_page_offset()) { + col_start = col->dictionary_page_offset(); + } + + int64_t col_length = col->total_compressed_size(); + std::unique_ptr stream; + + // PARQUET-816 workaround for old files created by older parquet-mr + const ApplicationVersion& version = file_metadata_->writer_version(); + if (version.VersionLt(ApplicationVersion::PARQUET_816_FIXED_VERSION)) { + // The Parquet MR writer had a bug in 1.2.8 and below where it didn't include the + // dictionary page header size in total_compressed_size and total_uncompressed_size + // (see IMPALA-694). We add padding to compensate. + int64_t bytes_remaining = source_->Size() - (col_start + col_length); + int64_t padding = std::min(kMaxDictHeaderSize, bytes_remaining); + col_length += padding; + } + + stream = properties_.GetStream(source_, col_start, col_length); + + return PageReader::Open(std::move(stream), col->num_values(), col->compression(), + properties_.memory_pool()); + } + + private: + RandomAccessSource* source_; + FileMetaData* file_metadata_; + std::unique_ptr row_group_metadata_; + ReaderProperties properties_; +}; + +// ---------------------------------------------------------------------- +// SerializedFile: An implementation of ParquetFileReader::Contents that deals +// with the Parquet file structure, Thrift deserialization, and other internal +// matters + +// This class takes ownership of the provided data source +class SerializedFile : public ParquetFileReader::Contents { + public: + SerializedFile(std::unique_ptr source, + const ReaderProperties& props = default_reader_properties()) + : source_(std::move(source)), properties_(props) {} + + ~SerializedFile() { + try { + Close(); + } catch (...) { + } + } + + void Close() override { source_->Close(); } + + std::shared_ptr GetRowGroup(int i) override { + std::unique_ptr contents( + new SerializedRowGroup(source_.get(), file_metadata_.get(), i, properties_)); + return std::make_shared(std::move(contents)); + } + + std::shared_ptr metadata() const override { return file_metadata_; } + + void set_metadata(const std::shared_ptr& metadata) { + file_metadata_ = metadata; + } + + void ParseMetaData() { + int64_t file_size = source_->Size(); + + if (file_size < FOOTER_SIZE) { + throw ParquetException("Corrupted file, smaller than file footer"); + } + + uint8_t footer_buffer[DEFAULT_FOOTER_READ_SIZE]; + int64_t footer_read_size = std::min(file_size, DEFAULT_FOOTER_READ_SIZE); + int64_t bytes_read = + source_->ReadAt(file_size - footer_read_size, footer_read_size, footer_buffer); + + // Check if all bytes are read. Check if last 4 bytes read have the magic bits + if (bytes_read != footer_read_size || + memcmp(footer_buffer + footer_read_size - 4, PARQUET_MAGIC, 4) != 0) { + throw ParquetException("Invalid parquet file. Corrupt footer."); + } + + uint32_t metadata_len = + *reinterpret_cast(footer_buffer + footer_read_size - FOOTER_SIZE); + int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len; + if (FOOTER_SIZE + metadata_len > file_size) { + throw ParquetException( + "Invalid parquet file. File is less than " + "file metadata size."); + } + + std::shared_ptr metadata_buffer = + AllocateBuffer(properties_.memory_pool(), metadata_len); + + // Check if the footer_buffer contains the entire metadata + if (footer_read_size >= (metadata_len + FOOTER_SIZE)) { + memcpy(metadata_buffer->mutable_data(), + footer_buffer + (footer_read_size - metadata_len - FOOTER_SIZE), + metadata_len); + } else { + bytes_read = + source_->ReadAt(metadata_start, metadata_len, metadata_buffer->mutable_data()); + if (bytes_read != metadata_len) { + throw ParquetException("Invalid parquet file. Could not read metadata bytes."); + } + } + + file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len); + } + + private: + std::unique_ptr source_; + std::shared_ptr file_metadata_; + ReaderProperties properties_; +}; + // ---------------------------------------------------------------------- // ParquetFileReader public API ParquetFileReader::ParquetFileReader() {} + ParquetFileReader::~ParquetFileReader() { try { Close(); @@ -77,6 +231,27 @@ ParquetFileReader::~ParquetFileReader() { } } +// Open the file. If no metadata is passed, it is parsed from the footer of +// the file +std::unique_ptr ParquetFileReader::Contents::Open( + std::unique_ptr source, const ReaderProperties& props, + const std::shared_ptr& metadata) { + std::unique_ptr result( + new SerializedFile(std::move(source), props)); + + // Access private methods here, but otherwise unavailable + SerializedFile* file = static_cast(result.get()); + + if (metadata == nullptr) { + // Validates magic bytes, parses metadata, and initializes the SchemaDescriptor + file->ParseMetaData(); + } else { + file->set_metadata(metadata); + } + + return result; +} + std::unique_ptr ParquetFileReader::Open( const std::shared_ptr<::arrow::io::ReadableFileInterface>& source, const ReaderProperties& props, const std::shared_ptr& metadata) { diff --git a/src/parquet/file/reader.h b/src/parquet/file_reader.h similarity index 94% rename from src/parquet/file/reader.h rename to src/parquet/file_reader.h index 7558394..f751e9b 100644 --- a/src/parquet/file/reader.h +++ b/src/parquet/file_reader.h @@ -25,8 +25,8 @@ #include #include -#include "parquet/column_page.h" -#include "parquet/file/metadata.h" +#include "parquet/column_reader.h" +#include "parquet/metadata.h" #include "parquet/properties.h" #include "parquet/schema.h" #include "parquet/statistics.h" @@ -71,6 +71,11 @@ class PARQUET_EXPORT ParquetFileReader { // easily create test fixtures // An implementation of the Contents class is defined in the .cc file struct Contents { + static std::unique_ptr Open( + std::unique_ptr source, + const ReaderProperties& props = default_reader_properties(), + const std::shared_ptr& metadata = nullptr); + virtual ~Contents() {} // Perform any cleanup associated with the file contents virtual void Close() = 0; diff --git a/src/parquet/file_writer.cc b/src/parquet/file_writer.cc new file mode 100644 index 0000000..87ee4f6 --- /dev/null +++ b/src/parquet/file_writer.cc @@ -0,0 +1,323 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/file_writer.h" + +#include "parquet/column_writer.h" +#include "parquet/schema-internal.h" +#include "parquet/schema.h" +#include "parquet/thrift.h" +#include "parquet/util/memory.h" + +using arrow::MemoryPool; + +using parquet::schema::GroupNode; +using parquet::schema::SchemaFlattener; + +namespace parquet { + +// FIXME: copied from reader-internal.cc +static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'}; + +// ---------------------------------------------------------------------- +// RowGroupWriter public API + +RowGroupWriter::RowGroupWriter(std::unique_ptr contents) + : contents_(std::move(contents)) {} + +void RowGroupWriter::Close() { + if (contents_) { + contents_->Close(); + } +} + +ColumnWriter* RowGroupWriter::NextColumn() { return contents_->NextColumn(); } + +int RowGroupWriter::current_column() { return contents_->current_column(); } + +int RowGroupWriter::num_columns() const { return contents_->num_columns(); } + +int64_t RowGroupWriter::num_rows() const { return contents_->num_rows(); } + +// ---------------------------------------------------------------------- +// RowGroupSerializer + +// RowGroupWriter::Contents implementation for the Parquet file specification +class RowGroupSerializer : public RowGroupWriter::Contents { + public: + RowGroupSerializer(OutputStream* sink, RowGroupMetaDataBuilder* metadata, + const WriterProperties* properties) + : sink_(sink), + metadata_(metadata), + properties_(properties), + total_bytes_written_(0), + closed_(false), + current_column_index_(0), + num_rows_(-1) {} + + int num_columns() const override { return metadata_->num_columns(); } + + int64_t num_rows() const override { + if (current_column_writer_) { + CheckRowsWritten(); + } + return num_rows_ < 0 ? 0 : num_rows_; + } + + ColumnWriter* NextColumn() override { + if (current_column_writer_) { + CheckRowsWritten(); + } + + // Throws an error if more columns are being written + auto col_meta = metadata_->NextColumnChunk(); + + if (current_column_writer_) { + total_bytes_written_ += current_column_writer_->Close(); + } + + ++current_column_index_; + + const ColumnDescriptor* column_descr = col_meta->descr(); + std::unique_ptr pager = + PageWriter::Open(sink_, properties_->compression(column_descr->path()), col_meta, + properties_->memory_pool()); + current_column_writer_ = ColumnWriter::Make(col_meta, std::move(pager), properties_); + return current_column_writer_.get(); + } + + int current_column() const override { return metadata_->current_column(); } + + void Close() override { + if (!closed_) { + closed_ = true; + + if (current_column_writer_) { + CheckRowsWritten(); + total_bytes_written_ += current_column_writer_->Close(); + current_column_writer_.reset(); + } + + // Ensures all columns have been written + metadata_->Finish(total_bytes_written_); + } + } + + private: + OutputStream* sink_; + mutable RowGroupMetaDataBuilder* metadata_; + const WriterProperties* properties_; + int64_t total_bytes_written_; + bool closed_; + int current_column_index_; + mutable int64_t num_rows_; + + void CheckRowsWritten() const { + int64_t current_rows = current_column_writer_->rows_written(); + if (num_rows_ < 0) { + num_rows_ = current_rows; + metadata_->set_num_rows(current_rows); + } else if (num_rows_ != current_rows) { + std::stringstream ss; + ss << "Column " << current_column_index_ << " had " << current_rows + << " while previous column had " << num_rows_; + throw ParquetException(ss.str()); + } + } + + std::shared_ptr current_column_writer_; +}; + +// ---------------------------------------------------------------------- +// FileSerializer + +// An implementation of ParquetFileWriter::Contents that deals with the Parquet +// file structure, Thrift serialization, and other internal matters + +class FileSerializer : public ParquetFileWriter::Contents { + public: + static std::unique_ptr Open( + const std::shared_ptr& sink, const std::shared_ptr& schema, + const std::shared_ptr& properties, + const std::shared_ptr& key_value_metadata) { + std::unique_ptr result( + new FileSerializer(sink, schema, properties, key_value_metadata)); + + return result; + } + + void Close() override { + if (is_open_) { + if (row_group_writer_) { + num_rows_ += row_group_writer_->num_rows(); + row_group_writer_->Close(); + } + row_group_writer_.reset(); + + // Write magic bytes and metadata + WriteMetaData(); + + sink_->Close(); + is_open_ = false; + } + } + + int num_columns() const override { return schema_.num_columns(); } + + int num_row_groups() const override { return num_row_groups_; } + + int64_t num_rows() const override { return num_rows_; } + + const std::shared_ptr& properties() const override { + return properties_; + } + + RowGroupWriter* AppendRowGroup() override { + if (row_group_writer_) { + row_group_writer_->Close(); + } + num_row_groups_++; + auto rg_metadata = metadata_->AppendRowGroup(); + std::unique_ptr contents( + new RowGroupSerializer(sink_.get(), rg_metadata, properties_.get())); + row_group_writer_.reset(new RowGroupWriter(std::move(contents))); + return row_group_writer_.get(); + } + + ~FileSerializer() { + try { + Close(); + } catch (...) { + } + } + + private: + FileSerializer(const std::shared_ptr& sink, + const std::shared_ptr& schema, + const std::shared_ptr& properties, + const std::shared_ptr& key_value_metadata) + : ParquetFileWriter::Contents(schema, key_value_metadata), + sink_(sink), + is_open_(true), + properties_(properties), + num_row_groups_(0), + num_rows_(0), + metadata_(FileMetaDataBuilder::Make(&schema_, properties, key_value_metadata)) { + StartFile(); + } + + std::shared_ptr sink_; + bool is_open_; + const std::shared_ptr properties_; + int num_row_groups_; + int64_t num_rows_; + std::unique_ptr metadata_; + std::unique_ptr row_group_writer_; + + void StartFile() { + // Parquet files always start with PAR1 + sink_->Write(PARQUET_MAGIC, 4); + } + + void WriteMetaData() { + // Write MetaData + uint32_t metadata_len = static_cast(sink_->Tell()); + + // Get a FileMetaData + auto metadata = metadata_->Finish(); + metadata->WriteTo(sink_.get()); + metadata_len = static_cast(sink_->Tell()) - metadata_len; + + // Write Footer + sink_->Write(reinterpret_cast(&metadata_len), 4); + sink_->Write(PARQUET_MAGIC, 4); + } +}; + +// ---------------------------------------------------------------------- +// ParquetFileWriter public API + +ParquetFileWriter::ParquetFileWriter() {} + +ParquetFileWriter::~ParquetFileWriter() { + try { + Close(); + } catch (...) { + } +} + +std::unique_ptr ParquetFileWriter::Open( + const std::shared_ptr<::arrow::io::OutputStream>& sink, + const std::shared_ptr& schema, + const std::shared_ptr& properties, + const std::shared_ptr& key_value_metadata) { + return Open(std::make_shared(sink), schema, properties, + key_value_metadata); +} + +std::unique_ptr ParquetFileWriter::Open( + const std::shared_ptr& sink, + const std::shared_ptr& schema, + const std::shared_ptr& properties, + const std::shared_ptr& key_value_metadata) { + auto contents = FileSerializer::Open(sink, schema, properties, key_value_metadata); + std::unique_ptr result(new ParquetFileWriter()); + result->Open(std::move(contents)); + return result; +} + +const SchemaDescriptor* ParquetFileWriter::schema() const { return contents_->schema(); } + +const ColumnDescriptor* ParquetFileWriter::descr(int i) const { + return contents_->schema()->Column(i); +} + +int ParquetFileWriter::num_columns() const { return contents_->num_columns(); } + +int64_t ParquetFileWriter::num_rows() const { return contents_->num_rows(); } + +int ParquetFileWriter::num_row_groups() const { return contents_->num_row_groups(); } + +const std::shared_ptr& ParquetFileWriter::key_value_metadata() + const { + return contents_->key_value_metadata(); +} + +void ParquetFileWriter::Open(std::unique_ptr contents) { + contents_ = std::move(contents); +} + +void ParquetFileWriter::Close() { + if (contents_) { + contents_->Close(); + contents_.reset(); + } +} + +RowGroupWriter* ParquetFileWriter::AppendRowGroup() { + return contents_->AppendRowGroup(); +} + +RowGroupWriter* ParquetFileWriter::AppendRowGroup(int64_t num_rows) { + return AppendRowGroup(); +} + +const std::shared_ptr& ParquetFileWriter::properties() const { + return contents_->properties(); +} + +} // namespace parquet diff --git a/src/parquet/file/writer.h b/src/parquet/file_writer.h similarity index 99% rename from src/parquet/file/writer.h rename to src/parquet/file_writer.h index 844aa16..f165261 100644 --- a/src/parquet/file/writer.h +++ b/src/parquet/file_writer.h @@ -21,7 +21,7 @@ #include #include -#include "parquet/file/metadata.h" +#include "parquet/metadata.h" #include "parquet/properties.h" #include "parquet/schema.h" #include "parquet/util/memory.h" diff --git a/src/parquet/file/file-metadata-test.cc b/src/parquet/metadata-test.cc similarity index 99% rename from src/parquet/file/file-metadata-test.cc rename to src/parquet/metadata-test.cc index 49cba3a..b20293b 100644 --- a/src/parquet/file/file-metadata-test.cc +++ b/src/parquet/metadata-test.cc @@ -15,8 +15,8 @@ // specific language governing permissions and limitations // under the License. +#include "parquet/metadata.h" #include -#include "parquet/file/metadata.h" #include "parquet/schema.h" #include "parquet/statistics.h" #include "parquet/types.h" diff --git a/src/parquet/file/metadata.cc b/src/parquet/metadata.cc similarity index 99% rename from src/parquet/file/metadata.cc rename to src/parquet/metadata.cc index 9f1cdd7..1c7db86 100644 --- a/src/parquet/file/metadata.cc +++ b/src/parquet/metadata.cc @@ -20,7 +20,7 @@ #include #include "parquet/exception.h" -#include "parquet/file/metadata.h" +#include "parquet/metadata.h" #include "parquet/schema-internal.h" #include "parquet/schema.h" #include "parquet/thrift.h" diff --git a/src/parquet/file/metadata.h b/src/parquet/metadata.h similarity index 100% rename from src/parquet/file/metadata.h rename to src/parquet/metadata.h diff --git a/src/parquet/file/printer.cc b/src/parquet/printer.cc similarity index 99% rename from src/parquet/file/printer.cc rename to src/parquet/printer.cc index 727552d..88b5528 100644 --- a/src/parquet/file/printer.cc +++ b/src/parquet/printer.cc @@ -15,7 +15,7 @@ // specific language governing permissions and limitations // under the License. -#include "parquet/file/printer.h" +#include "parquet/printer.h" #include #include diff --git a/src/parquet/file/printer.h b/src/parquet/printer.h similarity index 97% rename from src/parquet/file/printer.h rename to src/parquet/printer.h index a18af4a..3b82882 100644 --- a/src/parquet/file/printer.h +++ b/src/parquet/printer.h @@ -25,7 +25,7 @@ #include #include -#include "parquet/file/reader.h" +#include "parquet/file_reader.h" namespace parquet { diff --git a/src/parquet/properties-test.cc b/src/parquet/properties-test.cc index 4a063c1..c740b59 100644 --- a/src/parquet/properties-test.cc +++ b/src/parquet/properties-test.cc @@ -19,7 +19,7 @@ #include -#include "parquet/file/reader.h" +#include "parquet/file_reader.h" #include "parquet/properties.h" namespace parquet { diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc index cefa452..c536fdc 100644 --- a/src/parquet/reader-test.cc +++ b/src/parquet/reader-test.cc @@ -27,9 +27,8 @@ #include "parquet/column_reader.h" #include "parquet/column_scanner.h" -#include "parquet/file/printer.h" -#include "parquet/file/reader-internal.h" -#include "parquet/file/reader.h" +#include "parquet/file_reader.h" +#include "parquet/printer.h" #include "parquet/util/memory.h" using std::string; @@ -204,7 +203,7 @@ class HelperFileClosed : public ArrowInputFile { TEST_F(TestLocalFile, FileClosedOnDestruction) { bool close_called = false; { - auto contents = SerializedFile::Open( + auto contents = ParquetFileReader::Contents::Open( std::unique_ptr(new HelperFileClosed(handle, &close_called))); std::unique_ptr result(new ParquetFileReader()); result->Open(std::move(contents)); diff --git a/src/parquet/statistics-test.cc b/src/parquet/statistics-test.cc index fc905b5..e5992c6 100644 --- a/src/parquet/statistics-test.cc +++ b/src/parquet/statistics-test.cc @@ -26,8 +26,8 @@ #include "parquet/column_reader.h" #include "parquet/column_writer.h" -#include "parquet/file/reader.h" -#include "parquet/file/writer.h" +#include "parquet/file_reader.h" +#include "parquet/file_writer.h" #include "parquet/schema.h" #include "parquet/statistics.h" #include "parquet/test-specialization.h" @@ -194,8 +194,9 @@ bool* TestRowGroupStatistics::GetValuesPointer(std::vector& v } template -typename std::vector TestRowGroupStatistics< - TestType>::GetDeepCopy(const std::vector& values) { +typename std::vector +TestRowGroupStatistics::GetDeepCopy( + const std::vector& values) { return values; } diff --git a/src/parquet/test-util.h b/src/parquet/test-util.h index 0698652..ac6d0a1 100644 --- a/src/parquet/test-util.h +++ b/src/parquet/test-util.h @@ -79,8 +79,7 @@ class MockPageReader : public PageReader { explicit MockPageReader(const vector>& pages) : pages_(pages), page_index_(0) {} - // Implement the PageReader interface - virtual shared_ptr NextPage() { + shared_ptr NextPage() override { if (page_index_ == static_cast(pages_.size())) { // EOS to consumer return shared_ptr(nullptr); @@ -88,6 +87,9 @@ class MockPageReader : public PageReader { return pages_[page_index_++]; } + // No-op + void set_max_page_header_size(uint32_t size) override {} + private: vector> pages_; int page_index_; -- To stop receiving notification emails like this one, please contact ['"commits@parquet.apache.org" '].