Return-Path: X-Original-To: apmail-parquet-commits-archive@minotaur.apache.org Delivered-To: apmail-parquet-commits-archive@minotaur.apache.org Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by minotaur.apache.org (Postfix) with SMTP id DE0F518F27 for ; Thu, 11 Feb 2016 06:31:26 +0000 (UTC) Received: (qmail 97767 invoked by uid 500); 11 Feb 2016 06:31:26 -0000 Delivered-To: apmail-parquet-commits-archive@parquet.apache.org Received: (qmail 97734 invoked by uid 500); 11 Feb 2016 06:31:26 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 97725 invoked by uid 99); 11 Feb 2016 06:31:26 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Thu, 11 Feb 2016 06:31:26 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id A2287DFE01; Thu, 11 Feb 2016 06:31:26 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: julien@apache.org To: commits@parquet.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: parquet-cpp git commit: PARQUET-501: Add OutputStream abstract interface, refactor encoding code paths Date: Thu, 11 Feb 2016 06:31:26 +0000 (UTC) Repository: parquet-cpp Updated Branches: refs/heads/master 4d735876d -> c11e7d487 PARQUET-501: Add OutputStream abstract interface, refactor encoding code paths I also did a bit of tidying / reorganization and giving interfaces more descriptive names. Author: Wes McKinney Closes #46 from wesm/PARQUET-501 and squashes the following commits: 491aa89 [Wes McKinney] * Add a basic OutputStream abstract interface and an InMemoryOutputStream implementation for testing. * Refactor to use OutputStream on data encoding paths, reduce some code duplication in column-reader-test. * Collect all input/output classes into util/input.* and util/output.*. * Use int64_t in InputStream::Peek/Read. Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c11e7d48 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c11e7d48 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c11e7d48 Branch: refs/heads/master Commit: c11e7d487aba2ba91efb261fa20dda4dd6498ac7 Parents: 4d73587 Author: Wes McKinney Authored: Wed Feb 10 22:31:22 2016 -0800 Committer: Julien Le Dem Committed: Wed Feb 10 22:31:22 2016 -0800 ---------------------------------------------------------------------- example/parquet-dump-schema.cc | 2 +- example/parquet_reader.cc | 2 +- src/parquet/column/column-reader-test.cc | 55 +++------- src/parquet/column/serialized-page.cc | 3 +- src/parquet/column/serialized-page.h | 2 +- src/parquet/column/test-util.h | 100 +++++++++-------- src/parquet/encodings/encodings.h | 8 +- src/parquet/encodings/plain-encoding-test.cc | 11 +- src/parquet/encodings/plain-encoding.h | 39 ++++--- src/parquet/parquet.h | 4 +- src/parquet/reader-test.cc | 3 +- src/parquet/reader.cc | 47 +------- src/parquet/reader.h | 47 +------- src/parquet/util/CMakeLists.txt | 7 +- src/parquet/util/input.cc | 110 +++++++++++++++++++ src/parquet/util/input.h | 128 ++++++++++++++++++++++ src/parquet/util/input_stream.cc | 63 ----------- src/parquet/util/input_stream.h | 80 -------------- src/parquet/util/output-test.cc | 44 ++++++++ src/parquet/util/output.cc | 73 ++++++++++++ src/parquet/util/output.h | 71 ++++++++++++ src/parquet/util/test-common.h | 23 ++++ 22 files changed, 564 insertions(+), 358 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/example/parquet-dump-schema.cc ---------------------------------------------------------------------- diff --git a/example/parquet-dump-schema.cc b/example/parquet-dump-schema.cc index 9471225..09c715c 100644 --- a/example/parquet-dump-schema.cc +++ b/example/parquet-dump-schema.cc @@ -35,7 +35,7 @@ int main(int argc, char** argv) { std::string filename = argv[1]; parquet_cpp::ParquetFileReader reader; - parquet_cpp::LocalFile file; + parquet_cpp::LocalFileSource file; file.Open(filename); if (!file.is_open()) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/example/parquet_reader.cc ---------------------------------------------------------------------- diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc index 621f0ba..ca717df 100644 --- a/example/parquet_reader.cc +++ b/example/parquet_reader.cc @@ -40,7 +40,7 @@ int main(int argc, char** argv) { } parquet_cpp::ParquetFileReader reader; - parquet_cpp::LocalFile file; + parquet_cpp::LocalFileSource file; file.Open(filename); if (!file.is_open()) { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/column-reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc index 0d4aea1..84a36db 100644 --- a/src/parquet/column/column-reader-test.cc +++ b/src/parquet/column/column-reader-test.cc @@ -29,6 +29,7 @@ #include "parquet/column/reader.h" #include "parquet/column/test-util.h" +#include "parquet/util/output.h" #include "parquet/util/test-common.h" using std::string; @@ -60,31 +61,15 @@ class TestPrimitiveReader : public ::testing::Test { vector > pages_; }; -template -static vector slice(const vector& values, size_t start, size_t end) { - if (end < start) { - return vector(0); - } - - vector out(end - start); - for (size_t i = start; i < end; ++i) { - out[i - start] = values[i]; - } - return out; -} - TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { vector values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10}; - size_t num_values = values.size(); - parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN; - vector page1; - test::DataPageBuilder page_builder(&page1); - page_builder.AppendValues(values, parquet::Encoding::PLAIN); - pages_.push_back(page_builder.Finish()); + std::vector buffer; + std::shared_ptr page = MakeDataPage(values, {}, 0, + {}, 0, &buffer); + pages_.push_back(page); - // TODO: simplify this NodePtr type = schema::Int32("a", Repetition::REQUIRED); ColumnDescriptor descr(type, 0, 0); InitReader(&descr); @@ -102,21 +87,16 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequired) { ASSERT_TRUE(vector_equal(result, values)); } + TEST_F(TestPrimitiveReader, TestInt32FlatOptional) { vector values = {1, 2, 3, 4, 5}; vector def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1}; - size_t num_values = values.size(); - parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN; - - vector page1; - test::DataPageBuilder page_builder(&page1); - - // Definition levels precede the values - page_builder.AppendDefLevels(def_levels, 1, parquet::Encoding::RLE); - page_builder.AppendValues(values, parquet::Encoding::PLAIN); + std::vector buffer; + std::shared_ptr page = MakeDataPage(values, def_levels, 1, + {}, 0, &buffer); - pages_.push_back(page_builder.Finish()); + pages_.push_back(page); NodePtr type = schema::Int32("a", Repetition::OPTIONAL); ColumnDescriptor descr(type, 1, 0); @@ -159,18 +139,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) { vector def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1}; vector rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1}; - size_t num_values = values.size(); - parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN; - - vector page1; - test::DataPageBuilder page_builder(&page1); - - // Definition levels precede the values - page_builder.AppendRepLevels(rep_levels, 1, parquet::Encoding::RLE); - page_builder.AppendDefLevels(def_levels, 2, parquet::Encoding::RLE); - page_builder.AppendValues(values, parquet::Encoding::PLAIN); + std::vector buffer; + std::shared_ptr page = MakeDataPage(values, + def_levels, 2, rep_levels, 1, &buffer); - pages_.push_back(page_builder.Finish()); + pages_.push_back(page); NodePtr type = schema::Int32("a", Repetition::REPEATED); ColumnDescriptor descr(type, 2, 1); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/serialized-page.cc ---------------------------------------------------------------------- diff --git a/src/parquet/column/serialized-page.cc b/src/parquet/column/serialized-page.cc index 1cbaf4d..b9d470c 100644 --- a/src/parquet/column/serialized-page.cc +++ b/src/parquet/column/serialized-page.cc @@ -21,7 +21,6 @@ #include "parquet/exception.h" #include "parquet/thrift/util.h" -#include "parquet/util/input_stream.h" using parquet::PageType; @@ -52,7 +51,7 @@ std::shared_ptr SerializedPageReader::NextPage() { // Loop here because there may be unhandled page types that we skip until // finding a page that we do know what to do with while (true) { - int bytes_read = 0; + int64_t bytes_read = 0; const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read); if (bytes_read == 0) { return std::shared_ptr(nullptr); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/serialized-page.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/serialized-page.h b/src/parquet/column/serialized-page.h index 2735c3c..c02152f 100644 --- a/src/parquet/column/serialized-page.h +++ b/src/parquet/column/serialized-page.h @@ -27,7 +27,7 @@ #include "parquet/column/page.h" #include "parquet/compression/codec.h" -#include "parquet/util/input_stream.h" +#include "parquet/util/input.h" #include "parquet/thrift/parquet_types.h" namespace parquet_cpp { http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/test-util.h ---------------------------------------------------------------------- diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h index 8861134..1cbcf8c 100644 --- a/src/parquet/column/test-util.h +++ b/src/parquet/column/test-util.h @@ -52,26 +52,22 @@ class MockPageReader : public PageReader { size_t page_index_; }; -// TODO(wesm): this is only used for testing for now - -static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024; -static constexpr int INIT_BUFFER_SIZE = 1024; +// TODO(wesm): this is only used for testing for now. Refactor to form part of +// primary file write path template class DataPageBuilder { public: typedef typename type_traits::value_type T; - // The passed vector is the owner of the page's data - explicit DataPageBuilder(std::vector* out) : - out_(out), - buffer_size_(0), + // This class writes data and metadata to the passed inputs + explicit DataPageBuilder(InMemoryOutputStream* sink, parquet::DataPageHeader* header) : + sink_(sink), + header_(header), num_values_(0), have_def_levels_(false), have_rep_levels_(false), have_values_(false) { - out_->resize(INIT_BUFFER_SIZE); - buffer_capacity_ = INIT_BUFFER_SIZE; } void AppendDefLevels(const std::vector& levels, @@ -79,7 +75,7 @@ class DataPageBuilder { AppendLevels(levels, max_level, encoding); num_values_ = std::max(levels.size(), num_values_); - header_.__set_definition_level_encoding(encoding); + header_->__set_definition_level_encoding(encoding); have_def_levels_ = true; } @@ -88,7 +84,7 @@ class DataPageBuilder { AppendLevels(levels, max_level, encoding); num_values_ = std::max(levels.size(), num_values_); - header_.__set_repetition_level_encoding(encoding); + header_->__set_repetition_level_encoding(encoding); have_rep_levels_ = true; } @@ -98,53 +94,31 @@ class DataPageBuilder { ParquetException::NYI("only plain encoding currently implemented"); } size_t bytes_to_encode = values.size() * sizeof(T); - Reserve(bytes_to_encode); PlainEncoder encoder(nullptr); - size_t nbytes = encoder.Encode(&values[0], values.size(), Head()); - // In case for some reason it's fewer than bytes_to_encode - buffer_size_ += nbytes; + encoder.Encode(&values[0], values.size(), sink_); num_values_ = std::max(values.size(), num_values_); - header_.__set_encoding(encoding); + header_->__set_encoding(encoding); have_values_ = true; } - std::shared_ptr Finish() { + void Finish() { if (!have_values_) { throw ParquetException("A data page must at least contain values"); } - header_.__set_num_values(num_values_); - return std::make_shared(&(*out_)[0], buffer_size_, header_); + header_->__set_num_values(num_values_); } private: - std::vector* out_; - - size_t buffer_size_; - size_t buffer_capacity_; - - parquet::DataPageHeader header_; + InMemoryOutputStream* sink_; + parquet::DataPageHeader* header_; size_t num_values_; - bool have_def_levels_; bool have_rep_levels_; bool have_values_; - void Reserve(size_t nbytes) { - while ((nbytes + buffer_size_) > buffer_capacity_) { - // TODO(wesm): limit to one reserve when this loop runs more than once - size_t new_capacity = 2 * buffer_capacity_; - out_->resize(new_capacity); - buffer_capacity_ = new_capacity; - } - } - - uint8_t* Head() { - return &(*out_)[buffer_size_]; - } - // Used internally for both repetition and definition levels void AppendLevels(const std::vector& levels, int16_t max_level, parquet::Encoding::type encoding) { @@ -153,9 +127,11 @@ class DataPageBuilder { } // TODO: compute a more precise maximum size for the encoded levels - std::vector encode_buffer(DEFAULT_DATA_PAGE_SIZE); - + std::vector encode_buffer(levels.size() * 4); + // We encode into separate memory from the output stream because the + // RLE-encoded bytes have to be preceded in the stream by their absolute + // size. LevelEncoder encoder; encoder.Init(encoding, max_level, levels.size(), encode_buffer.data(), encode_buffer.size()); @@ -163,15 +139,43 @@ class DataPageBuilder { encoder.Encode(levels.size(), levels.data()); uint32_t rle_bytes = encoder.len(); - size_t levels_footprint = sizeof(uint32_t) + rle_bytes; - Reserve(levels_footprint); - - *reinterpret_cast(Head()) = rle_bytes; - memcpy(Head() + sizeof(uint32_t), encode_buffer.data(), rle_bytes); - buffer_size_ += levels_footprint; + sink_->Write(reinterpret_cast(&rle_bytes), sizeof(uint32_t)); + sink_->Write(encode_buffer.data(), rle_bytes); } }; +template +static std::shared_ptr MakeDataPage(const std::vector& values, + const std::vector& def_levels, int16_t max_def_level, + const std::vector& rep_levels, int16_t max_rep_level, + std::vector* out_buffer) { + size_t num_values = values.size(); + + InMemoryOutputStream page_stream; + parquet::DataPageHeader page_header; + + test::DataPageBuilder page_builder(&page_stream, &page_header); + + if (!rep_levels.empty()) { + page_builder.AppendRepLevels(rep_levels, max_rep_level, + parquet::Encoding::RLE); + } + + if (!def_levels.empty()) { + page_builder.AppendDefLevels(def_levels, max_def_level, + parquet::Encoding::RLE); + } + + page_builder.AppendValues(values, parquet::Encoding::PLAIN); + page_builder.Finish(); + + // Hand off the data stream to the passed std::vector + page_stream.Transfer(out_buffer); + + return std::make_shared(&(*out_buffer)[0], out_buffer->size(), page_header); +} + + } // namespace test } // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/encodings/encodings.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h index 21754d1..46c61b6 100644 --- a/src/parquet/encodings/encodings.h +++ b/src/parquet/encodings/encodings.h @@ -23,6 +23,7 @@ #include "parquet/exception.h" #include "parquet/types.h" +#include "parquet/util/output.h" #include "parquet/util/rle-encoding.h" #include "parquet/util/bit-stream-utils.inline.h" @@ -82,14 +83,9 @@ class Encoder { virtual ~Encoder() {} - // TODO(wesm): use an output stream - // Subclasses should override the ones they support - // - // @returns: the number of bytes written to dst - virtual size_t Encode(const T* src, int num_values, uint8_t* dst) { + virtual void Encode(const T* src, int num_values, OutputStream* dst) { throw ParquetException("Encoder does not implement this type."); - return 0; } const parquet::Encoding::type encoding() const { return encoding_; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/encodings/plain-encoding-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc index ca425dd..16862b8 100644 --- a/src/parquet/encodings/plain-encoding-test.cc +++ b/src/parquet/encodings/plain-encoding-test.cc @@ -43,15 +43,18 @@ TEST(BooleanTest, TestEncodeDecode) { PlainEncoder encoder(nullptr); PlainDecoder decoder(nullptr); - std::vector encode_buffer(nbytes); + InMemoryOutputStream dst; + encoder.Encode(draws, nvalues, &dst); - size_t encoded_bytes = encoder.Encode(draws, nvalues, &encode_buffer[0]); - ASSERT_EQ(nbytes, encoded_bytes); + std::vector encode_buffer; + dst.Transfer(&encode_buffer); + + ASSERT_EQ(nbytes, encode_buffer.size()); std::vector decode_buffer(nbytes); const uint8_t* decode_data = &decode_buffer[0]; - decoder.SetData(nvalues, &encode_buffer[0], encoded_bytes); + decoder.SetData(nvalues, &encode_buffer[0], encode_buffer.size()); size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues); ASSERT_EQ(nvalues, values_decoded); http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/encodings/plain-encoding.h ---------------------------------------------------------------------- diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h index 03f5940..a450eb4 100644 --- a/src/parquet/encodings/plain-encoding.h +++ b/src/parquet/encodings/plain-encoding.h @@ -147,7 +147,7 @@ class PlainEncoder : public Encoder { explicit PlainEncoder(const ColumnDescriptor* descr) : Encoder(descr, parquet::Encoding::PLAIN) {} - virtual size_t Encode(const T* src, int num_values, uint8_t* dst); + virtual void Encode(const T* src, int num_values, OutputStream* dst); }; template <> @@ -156,43 +156,46 @@ class PlainEncoder : public Encoder { explicit PlainEncoder(const ColumnDescriptor* descr) : Encoder(descr, parquet::Encoding::PLAIN) {} - virtual size_t Encode(const bool* src, int num_values, uint8_t* dst) { + virtual void Encode(const bool* src, int num_values, OutputStream* dst) { throw ParquetException("this API for encoding bools not implemented"); - return 0; } - size_t Encode(const std::vector& src, int num_values, - uint8_t* dst) { + void Encode(const std::vector& src, int num_values, OutputStream* dst) { size_t bytes_required = BitUtil::RoundUp(num_values, 8) / 8; - BitWriter bit_writer(dst, bytes_required); + + // TODO(wesm) + // Use a temporary buffer for now and copy, because the BitWriter is not + // aware of OutputStream. Later we can add some kind of Request/Flush API + // to OutputStream + std::vector tmp_buffer(bytes_required); + + BitWriter bit_writer(&tmp_buffer[0], bytes_required); for (size_t i = 0; i < num_values; ++i) { bit_writer.PutValue(src[i], 1); } bit_writer.Flush(); - return bit_writer.bytes_written(); + + // Write the result to the output stream + dst->Write(bit_writer.buffer(), bit_writer.bytes_written()); } }; template -inline size_t PlainEncoder::Encode(const T* buffer, int num_values, - uint8_t* dst) { - size_t nbytes = num_values * sizeof(T); - memcpy(dst, buffer, nbytes); - return nbytes; +inline void PlainEncoder::Encode(const T* buffer, int num_values, + OutputStream* dst) { + dst->Write(reinterpret_cast(buffer), num_values * sizeof(T)); } template <> -inline size_t PlainEncoder::Encode(const ByteArray* src, - int num_values, uint8_t* dst) { +inline void PlainEncoder::Encode(const ByteArray* src, + int num_values, OutputStream* dst) { ParquetException::NYI("byte array encoding"); - return 0; } template <> -inline size_t PlainEncoder::Encode( - const FixedLenByteArray* src, int num_values, uint8_t* dst) { +inline void PlainEncoder::Encode( + const FixedLenByteArray* src, int num_values, OutputStream* dst) { ParquetException::NYI("FLBA encoding"); - return 0; } } // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/parquet.h ---------------------------------------------------------------------- diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h index 84a32f3..7030d0e 100644 --- a/src/parquet/parquet.h +++ b/src/parquet/parquet.h @@ -29,6 +29,8 @@ #include "parquet/exception.h" #include "parquet/reader.h" #include "parquet/column/reader.h" -#include "parquet/util/input_stream.h" + +#include "parquet/util/input.h" +#include "parquet/util/output.h" #endif http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/reader-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc index ffc882c..8da8b99 100644 --- a/src/parquet/reader-test.cc +++ b/src/parquet/reader-test.cc @@ -25,6 +25,7 @@ #include "parquet/reader.h" #include "parquet/column/reader.h" #include "parquet/column/scanner.h" +#include "parquet/util/input.h" using std::string; @@ -47,7 +48,7 @@ class TestAllTypesPlain : public ::testing::Test { void TearDown() {} protected: - LocalFile file_; + LocalFileSource file_; ParquetFileReader reader_; }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/reader.cc ---------------------------------------------------------------------- diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc index 2f30ebf..3fcce90 100644 --- a/src/parquet/reader.cc +++ b/src/parquet/reader.cc @@ -31,7 +31,6 @@ #include "parquet/exception.h" #include "parquet/schema/converter.h" #include "parquet/thrift/util.h" -#include "parquet/util/input_stream.h" using std::string; using std::vector; @@ -39,48 +38,6 @@ using std::vector; namespace parquet_cpp { // ---------------------------------------------------------------------- -// LocalFile methods - -LocalFile::~LocalFile() { - CloseFile(); -} - -void LocalFile::Open(const std::string& path) { - path_ = path; - file_ = fopen(path_.c_str(), "r"); - is_open_ = true; -} - -void LocalFile::Close() { - // Pure virtual - CloseFile(); -} - -void LocalFile::CloseFile() { - if (is_open_) { - fclose(file_); - is_open_ = false; - } -} - -size_t LocalFile::Size() { - fseek(file_, 0L, SEEK_END); - return Tell(); -} - -void LocalFile::Seek(size_t pos) { - fseek(file_, pos, SEEK_SET); -} - -size_t LocalFile::Tell() { - return ftell(file_); -} - -size_t LocalFile::Read(size_t nbytes, uint8_t* buffer) { - return fread(buffer, 1, nbytes, file_); -} - -// ---------------------------------------------------------------------- // RowGroupReader std::shared_ptr RowGroupReader::Column(size_t i) { @@ -102,7 +59,7 @@ std::shared_ptr RowGroupReader::Column(size_t i) { std::unique_ptr input( new ScopedInMemoryInputStream(col.meta_data.total_compressed_size)); - FileLike* source = this->parent_->buffer_; + RandomAccessSource* source = this->parent_->buffer_; source->Seek(col_start); @@ -141,7 +98,7 @@ ParquetFileReader::ParquetFileReader() : ParquetFileReader::~ParquetFileReader() {} -void ParquetFileReader::Open(FileLike* buffer) { +void ParquetFileReader::Open(RandomAccessSource* buffer) { buffer_ = buffer; } http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/reader.h ---------------------------------------------------------------------- diff --git a/src/parquet/reader.h b/src/parquet/reader.h index ea23182..3a9dc5d 100644 --- a/src/parquet/reader.h +++ b/src/parquet/reader.h @@ -27,53 +27,12 @@ #include "parquet/thrift/parquet_types.h" #include "parquet/types.h" - #include "parquet/schema/descriptor.h" +#include "parquet/util/input.h" namespace parquet_cpp { class ColumnReader; - -class FileLike { - public: - virtual ~FileLike() {} - - virtual void Close() = 0; - virtual size_t Size() = 0; - virtual size_t Tell() = 0; - virtual void Seek(size_t pos) = 0; - - // Returns actual number of bytes read - virtual size_t Read(size_t nbytes, uint8_t* out) = 0; -}; - - -class LocalFile : public FileLike { - public: - LocalFile() : file_(nullptr), is_open_(false) {} - virtual ~LocalFile(); - - void Open(const std::string& path); - - virtual void Close(); - virtual size_t Size(); - virtual size_t Tell(); - virtual void Seek(size_t pos); - - // Returns actual number of bytes read - virtual size_t Read(size_t nbytes, uint8_t* out); - - bool is_open() const { return is_open_;} - const std::string& path() const { return path_;} - - private: - void CloseFile(); - - std::string path_; - FILE* file_; - bool is_open_; -}; - class ParquetFileReader; class RowGroupReader { @@ -112,7 +71,7 @@ class ParquetFileReader { // This class does _not_ take ownership of the file. You must manage its // lifetime separately - void Open(FileLike* buffer); + void Open(RandomAccessSource* buffer); void Close(); @@ -150,7 +109,7 @@ class ParquetFileReader { // Row group index -> RowGroupReader std::unordered_map > row_group_readers_; - FileLike* buffer_; + RandomAccessSource* buffer_; }; http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt index 046a7c9..504069f 100644 --- a/src/parquet/util/CMakeLists.txt +++ b/src/parquet/util/CMakeLists.txt @@ -27,11 +27,13 @@ install(FILES macros.h rle-encoding.h stopwatch.h - input_stream.h + input.h + output.h DESTINATION include/parquet/util) add_library(parquet_util STATIC - input_stream.cc + input.cc + output.cc cpu-info.cc ) @@ -54,4 +56,5 @@ if(PARQUET_BUILD_TESTS) endif() ADD_PARQUET_TEST(bit-util-test) +ADD_PARQUET_TEST(output-test) ADD_PARQUET_TEST(rle-test) http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc new file mode 100644 index 0000000..0e4b833 --- /dev/null +++ b/src/parquet/util/input.cc @@ -0,0 +1,110 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/util/input.h" + +#include +#include + +#include "parquet/exception.h" + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// LocalFileSource + +LocalFileSource::~LocalFileSource() { + CloseFile(); +} + +void LocalFileSource::Open(const std::string& path) { + path_ = path; + file_ = fopen(path_.c_str(), "r"); + is_open_ = true; +} + +void LocalFileSource::Close() { + // Pure virtual + CloseFile(); +} + +void LocalFileSource::CloseFile() { + if (is_open_) { + fclose(file_); + is_open_ = false; + } +} + +size_t LocalFileSource::Size() { + fseek(file_, 0L, SEEK_END); + return Tell(); +} + +void LocalFileSource::Seek(size_t pos) { + fseek(file_, pos, SEEK_SET); +} + +size_t LocalFileSource::Tell() { + return ftell(file_); +} + +size_t LocalFileSource::Read(size_t nbytes, uint8_t* buffer) { + return fread(buffer, 1, nbytes, file_); +} + +// ---------------------------------------------------------------------- +// InMemoryInputStream + +InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) : + buffer_(buffer), len_(len), offset_(0) {} + +const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { + *num_bytes = std::min(static_cast(num_to_peek), len_ - offset_); + return buffer_ + offset_; +} + +const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { + const uint8_t* result = Peek(num_to_read, num_bytes); + offset_ += *num_bytes; + return result; +} + +// ---------------------------------------------------------------------- +// ScopedInMemoryInputStream:: like InMemoryInputStream but owns its memory + +ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) { + buffer_.resize(len); + stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size())); +} + +uint8_t* ScopedInMemoryInputStream::data() { + return buffer_.data(); +} + +int64_t ScopedInMemoryInputStream::size() { + return buffer_.size(); +} + +const uint8_t* ScopedInMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) { + return stream_->Peek(num_to_peek, num_bytes); +} + +const uint8_t* ScopedInMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) { + return stream_->Read(num_to_read, num_bytes); +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h new file mode 100644 index 0000000..4fd9cd7 --- /dev/null +++ b/src/parquet/util/input.h @@ -0,0 +1,128 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_UTIL_INPUT_H +#define PARQUET_UTIL_INPUT_H + +#include +#include +#include +#include + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// Random access input (e.g. file-like) + +// Random +class RandomAccessSource { + public: + virtual ~RandomAccessSource() {} + + virtual void Close() = 0; + virtual size_t Size() = 0; + virtual size_t Tell() = 0; + virtual void Seek(size_t pos) = 0; + + // Returns actual number of bytes read + virtual size_t Read(size_t nbytes, uint8_t* out) = 0; +}; + + +class LocalFileSource : public RandomAccessSource { + public: + LocalFileSource() : file_(nullptr), is_open_(false) {} + virtual ~LocalFileSource(); + + void Open(const std::string& path); + + virtual void Close(); + virtual size_t Size(); + virtual size_t Tell(); + virtual void Seek(size_t pos); + + // Returns actual number of bytes read + virtual size_t Read(size_t nbytes, uint8_t* out); + + bool is_open() const { return is_open_;} + const std::string& path() const { return path_;} + + private: + void CloseFile(); + + std::string path_; + FILE* file_; + bool is_open_; +}; + +// ---------------------------------------------------------------------- +// Streaming input interfaces + +// Interface for the column reader to get the bytes. The interface is a stream +// interface, meaning the bytes in order and once a byte is read, it does not +// need to be read again. +class InputStream { + public: + // Returns the next 'num_to_peek' without advancing the current position. + // *num_bytes will contain the number of bytes returned which can only be + // less than num_to_peek at end of stream cases. + // Since the position is not advanced, calls to this function are idempotent. + // The buffer returned to the caller is still owned by the input stream and must + // stay valid until the next call to Peek() or Read(). + virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0; + + // Identical to Peek(), except the current position in the stream is advanced by + // *num_bytes. + virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0; + + virtual ~InputStream() {} + + protected: + InputStream() {} +}; + +// Implementation of an InputStream when all the bytes are in memory. +class InMemoryInputStream : public InputStream { + public: + InMemoryInputStream(const uint8_t* buffer, int64_t len); + virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); + virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); + + private: + const uint8_t* buffer_; + int64_t len_; + int64_t offset_; +}; + + +// A wrapper for InMemoryInputStream to manage the memory. +class ScopedInMemoryInputStream : public InputStream { + public: + explicit ScopedInMemoryInputStream(int64_t len); + uint8_t* data(); + int64_t size(); + virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes); + virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes); + + private: + std::vector buffer_; + std::unique_ptr stream_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_INPUT_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input_stream.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/input_stream.cc b/src/parquet/util/input_stream.cc deleted file mode 100644 index 281a342..0000000 --- a/src/parquet/util/input_stream.cc +++ /dev/null @@ -1,63 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#include "parquet/util/input_stream.h" - -#include - -#include "parquet/exception.h" - -namespace parquet_cpp { - -InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) : - buffer_(buffer), len_(len), offset_(0) {} - -const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) { - *num_bytes = std::min(static_cast(num_to_peek), len_ - offset_); - return buffer_ + offset_; -} - -const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) { - const uint8_t* result = Peek(num_to_read, num_bytes); - offset_ += *num_bytes; - return result; -} - -ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) { - buffer_.resize(len); - stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size())); -} - -uint8_t* ScopedInMemoryInputStream::data() { - return buffer_.data(); -} - -int64_t ScopedInMemoryInputStream::size() { - return buffer_.size(); -} - -const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek, - int* num_bytes) { - return stream_->Peek(num_to_peek, num_bytes); -} - -const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read, - int* num_bytes) { - return stream_->Read(num_to_read, num_bytes); -} - -} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input_stream.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/input_stream.h b/src/parquet/util/input_stream.h deleted file mode 100644 index ece2488..0000000 --- a/src/parquet/util/input_stream.h +++ /dev/null @@ -1,80 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -#ifndef PARQUET_INPUT_STREAM_H -#define PARQUET_INPUT_STREAM_H - -#include -#include -#include - -namespace parquet_cpp { - -// Interface for the column reader to get the bytes. The interface is a stream -// interface, meaning the bytes in order and once a byte is read, it does not -// need to be read again. -class InputStream { - public: - // Returns the next 'num_to_peek' without advancing the current position. - // *num_bytes will contain the number of bytes returned which can only be - // less than num_to_peek at end of stream cases. - // Since the position is not advanced, calls to this function are idempotent. - // The buffer returned to the caller is still owned by the input stream and must - // stay valid until the next call to Peek() or Read(). - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0; - - // Identical to Peek(), except the current position in the stream is advanced by - // *num_bytes. - virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0; - - virtual ~InputStream() {} - - protected: - InputStream() {} -}; - -// Implementation of an InputStream when all the bytes are in memory. -class InMemoryInputStream : public InputStream { - public: - InMemoryInputStream(const uint8_t* buffer, int64_t len); - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); - virtual const uint8_t* Read(int num_to_read, int* num_bytes); - - private: - const uint8_t* buffer_; - int64_t len_; - int64_t offset_; -}; - - -// A wrapper for InMemoryInputStream to manage the memory. -class ScopedInMemoryInputStream : public InputStream { - public: - explicit ScopedInMemoryInputStream(int64_t len); - uint8_t* data(); - int64_t size(); - virtual const uint8_t* Peek(int num_to_peek, int* num_bytes); - virtual const uint8_t* Read(int num_to_read, int* num_bytes); - - private: - std::vector buffer_; - std::unique_ptr stream_; -}; - -} // namespace parquet_cpp - -#endif // PARQUET_INPUT_STREAM_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/output-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/output-test.cc b/src/parquet/util/output-test.cc new file mode 100644 index 0000000..84f5b57 --- /dev/null +++ b/src/parquet/util/output-test.cc @@ -0,0 +1,44 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include + +#include "parquet/util/output.h" +#include "parquet/util/test-common.h" + +namespace parquet_cpp { + +TEST(TestInMemoryOutputStream, Basics) { + std::unique_ptr stream(new InMemoryOutputStream(8)); + + std::vector data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12}; + + stream->Write(&data[0], 4); + ASSERT_EQ(4, stream->Tell()); + stream->Write(&data[4], data.size() - 4); + + std::vector out; + stream->Transfer(&out); + + test::assert_vector_equal(data, out); + + ASSERT_EQ(0, stream->Tell()); +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/output.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc new file mode 100644 index 0000000..9748a69 --- /dev/null +++ b/src/parquet/util/output.cc @@ -0,0 +1,73 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "parquet/util/output.h" + +#include +#include +#include + +#include "parquet/exception.h" + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// In-memory output stream + +static constexpr int64_t IN_MEMORY_DEFAULT_CAPACITY = 1024; + +InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity) : + size_(0), + capacity_(initial_capacity) { + if (initial_capacity == 0) { + initial_capacity = IN_MEMORY_DEFAULT_CAPACITY; + } + buffer_.resize(initial_capacity); +} + +InMemoryOutputStream::InMemoryOutputStream() : + InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY) {} + +uint8_t* InMemoryOutputStream::Head() { + return &buffer_[size_]; +} + +void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) { + if (size_ + length > capacity_) { + int64_t new_capacity = capacity_ * 2; + while (new_capacity < size_ + length) { + new_capacity *= 2; + } + buffer_.resize(new_capacity); + capacity_ = new_capacity; + } + memcpy(Head(), data, length); + size_ += length; +} + +int64_t InMemoryOutputStream::Tell() { + return size_; +} + +void InMemoryOutputStream::Transfer(std::vector* out) { + buffer_.resize(size_); + buffer_.swap(*out); + size_ = 0; + capacity_ = buffer_.size(); +} + +} // namespace parquet_cpp http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/output.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h new file mode 100644 index 0000000..e83b261 --- /dev/null +++ b/src/parquet/util/output.h @@ -0,0 +1,71 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef PARQUET_UTIL_OUTPUT_H +#define PARQUET_UTIL_OUTPUT_H + +#include +#include +#include + +namespace parquet_cpp { + +// ---------------------------------------------------------------------- +// Output stream classes + +// Abstract output stream +class OutputStream { + public: + // Close the output stream + virtual void Close() = 0; + + // Return the current position in the output stream relative to the start + virtual int64_t Tell() = 0; + + // Copy bytes into the output stream + virtual void Write(const uint8_t* data, int64_t length) = 0; +}; + + +// An output stream that is an in-memory +class InMemoryOutputStream : public OutputStream { + public: + InMemoryOutputStream(); + explicit InMemoryOutputStream(int64_t initial_capacity); + + // Close is currently a no-op with the in-memory stream + virtual void Close() {} + + virtual int64_t Tell(); + + virtual void Write(const uint8_t* data, int64_t length); + + // Hand off the in-memory data to a (preferably-empty) std::vector owner + void Transfer(std::vector* out); + + private: + // Mutable pointer to the current write position in the stream + uint8_t* Head(); + + std::vector buffer_; + int64_t size_; + int64_t capacity_; +}; + +} // namespace parquet_cpp + +#endif // PARQUET_UTIL_OUTPUT_H http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/test-common.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h index 3cf82f5..84519d6 100644 --- a/src/parquet/util/test-common.h +++ b/src/parquet/util/test-common.h @@ -29,6 +29,16 @@ namespace parquet_cpp { namespace test { template +static inline void assert_vector_equal(const vector& left, + const vector& right) { + ASSERT_EQ(left.size(), right.size()); + + for (size_t i = 0; i < left.size(); ++i) { + ASSERT_EQ(left[i], right[i]) << i; + } +} + +template static inline bool vector_equal(const vector& left, const vector& right) { if (left.size() != right.size()) { return false; @@ -47,6 +57,19 @@ static inline bool vector_equal(const vector& left, const vector& right) { return true; } +template +static vector slice(const vector& values, size_t start, size_t end) { + if (end < start) { + return vector(0); + } + + vector out(end - start); + for (size_t i = start; i < end; ++i) { + out[i - start] = values[i]; + } + return out; +} + static inline vector flip_coins_seed(size_t n, double p, uint32_t seed) { std::mt19937 gen(seed); std::bernoulli_distribution d(p);