Return-Path: X-Original-To: archive-asf-public-internal@cust-asf2.ponee.io Delivered-To: archive-asf-public-internal@cust-asf2.ponee.io Received: from cust-asf.ponee.io (cust-asf.ponee.io [163.172.22.183]) by cust-asf2.ponee.io (Postfix) with ESMTP id AC8A32009EE for ; Wed, 18 May 2016 19:55:30 +0200 (CEST) Received: by cust-asf.ponee.io (Postfix) id AB188160A00; Wed, 18 May 2016 17:55:30 +0000 (UTC) Delivered-To: archive-asf-public@cust-asf.ponee.io Received: from mail.apache.org (hermes.apache.org [140.211.11.3]) by cust-asf.ponee.io (Postfix) with SMTP id CCB741609B0 for ; Wed, 18 May 2016 19:55:29 +0200 (CEST) Received: (qmail 82438 invoked by uid 500); 18 May 2016 17:55:29 -0000 Mailing-List: contact commits-help@parquet.apache.org; run by ezmlm Precedence: bulk List-Help: List-Unsubscribe: List-Post: List-Id: Reply-To: dev@parquet.apache.org Delivered-To: mailing list commits@parquet.apache.org Received: (qmail 82429 invoked by uid 99); 18 May 2016 17:55:29 -0000 Received: from git1-us-west.apache.org (HELO git1-us-west.apache.org) (140.211.11.23) by apache.org (qpsmtpd/0.29) with ESMTP; Wed, 18 May 2016 17:55:29 +0000 Received: by git1-us-west.apache.org (ASF Mail Server at git1-us-west.apache.org, from userid 33) id CCF14DFC74; Wed, 18 May 2016 17:55:28 +0000 (UTC) Content-Type: text/plain; charset="us-ascii" MIME-Version: 1.0 Content-Transfer-Encoding: 7bit From: wesm@apache.org To: commits@parquet.apache.org Message-Id: X-Mailer: ASF-Git Admin Mailer Subject: parquet-cpp git commit: PARQUET-619: Add OutputStream for local files Date: Wed, 18 May 2016 17:55:28 +0000 (UTC) archived-at: Wed, 18 May 2016 17:55:30 -0000 Repository: parquet-cpp Updated Branches: refs/heads/master 45ca7bab9 -> 41981d981 PARQUET-619: Add OutputStream for local files Author: Uwe L. Korn Closes #107 from xhochy/parquet-619 and squashes the following commits: 612a2f2 [Uwe L. Korn] Check that the correct data was written 66fb32a [Uwe L. Korn] Add unit test for LocalFileOutputStream a1179d1 [Uwe L. Korn] Add missing header 6ecb940 [Uwe L. Korn] Add OutputStream for local files Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/41981d98 Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/41981d98 Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/41981d98 Branch: refs/heads/master Commit: 41981d981051f7f4f318f6edc6d3d9526d764489 Parents: 45ca7ba Author: Uwe L. Korn Authored: Wed May 18 10:54:45 2016 -0700 Committer: Wes McKinney Committed: Wed May 18 10:54:45 2016 -0700 ---------------------------------------------------------------------- src/parquet/util/input-output-test.cc | 33 ++++++++++++++++++++ src/parquet/util/output.cc | 50 ++++++++++++++++++++++++++++++ src/parquet/util/output.h | 27 ++++++++++++++++ 3 files changed, 110 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41981d98/src/parquet/util/input-output-test.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc index a98ae08..72aad9c 100644 --- a/src/parquet/util/input-output-test.cc +++ b/src/parquet/util/input-output-test.cc @@ -208,4 +208,37 @@ TYPED_TEST(TestFileReaders, BadSeek) { ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException); } +class TestFileWriter : public ::testing::Test { + public: + void SetUp() { + test_path_ = "parquet-input-output-test.txt"; + if (file_exists(test_path_)) { std::remove(test_path_.c_str()); } + } + + void TearDown() { DeleteTestFile(); } + + void DeleteTestFile() { + if (file_exists(test_path_)) { std::remove(test_path_.c_str()); } + } + + protected: + std::string test_path_; + uint8_t test_data_[4] = {1, 2, 3, 4}; +}; + +TEST_F(TestFileWriter, Write) { + LocalFileOutputStream sink(test_path_); + ASSERT_EQ(0, sink.Tell()); + sink.Write(test_data_, 4); + ASSERT_EQ(4, sink.Tell()); + sink.Close(); + + // Check that the correct content was written + LocalFileSource source; + source.Open(test_path_); + std::shared_ptr buffer = source.Read(4); + ASSERT_EQ(4, buffer->size()); + ASSERT_EQ(0, memcmp(test_data_, buffer->data(), 4)); +} + } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41981d98/src/parquet/util/output.cc ---------------------------------------------------------------------- diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc index 1d27355..4f024a5 100644 --- a/src/parquet/util/output.cc +++ b/src/parquet/util/output.cc @@ -26,6 +26,11 @@ namespace parquet { // ---------------------------------------------------------------------- +// OutputStream + +OutputStream::~OutputStream() {} + +// ---------------------------------------------------------------------- // In-memory output stream InMemoryOutputStream::InMemoryOutputStream( @@ -35,6 +40,8 @@ InMemoryOutputStream::InMemoryOutputStream( buffer_.reset(new OwnedMutableBuffer(initial_capacity, allocator)); } +InMemoryOutputStream::~InMemoryOutputStream() {} + uint8_t* InMemoryOutputStream::Head() { return buffer_->mutable_data() + size_; } @@ -63,4 +70,47 @@ std::shared_ptr InMemoryOutputStream::GetBuffer() { return result; } +// ---------------------------------------------------------------------- +// local file output stream + +LocalFileOutputStream::LocalFileOutputStream(const std::string& path) : is_open_(true) { + file_ = fopen(path.c_str(), "wb"); + if (file_ == nullptr || ferror(file_)) { + std::stringstream ss; + ss << "Unable to open file: " << path; + throw ParquetException(ss.str()); + } +} + +LocalFileOutputStream::~LocalFileOutputStream() { + CloseFile(); +} + +void LocalFileOutputStream::Close() { + CloseFile(); +} + +int64_t LocalFileOutputStream::Tell() { + DCHECK(is_open_); + int64_t position = ftell(file_); + if (position < 0) { throw ParquetException("ftell failed, did the file disappear?"); } + return position; +} + +void LocalFileOutputStream::Write(const uint8_t* data, int64_t length) { + DCHECK(is_open_); + int64_t bytes_written = fwrite(data, sizeof(uint8_t), length, file_); + if (bytes_written != length) { + int error_code = ferror(file_); + throw ParquetException("fwrite failed, error code: " + std::to_string(error_code)); + } +} + +void LocalFileOutputStream::CloseFile() { + if (is_open_) { + fclose(file_); + is_open_ = false; + } +} + } // namespace parquet http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/41981d98/src/parquet/util/output.h ---------------------------------------------------------------------- diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h index 472a9cc..a7cb773 100644 --- a/src/parquet/util/output.h +++ b/src/parquet/util/output.h @@ -20,6 +20,7 @@ #include #include +#include #include "parquet/util/macros.h" #include "parquet/util/mem-allocator.h" @@ -35,6 +36,8 @@ class ResizableBuffer; // Abstract output stream class OutputStream { public: + virtual ~OutputStream(); + // Close the output stream virtual void Close() = 0; @@ -53,6 +56,8 @@ class InMemoryOutputStream : public OutputStream { explicit InMemoryOutputStream(int64_t initial_capacity = IN_MEMORY_DEFAULT_CAPACITY, MemoryAllocator* allocator = default_allocator()); + virtual ~InMemoryOutputStream(); + // Close is currently a no-op with the in-memory stream virtual void Close() {} @@ -74,6 +79,28 @@ class InMemoryOutputStream : public OutputStream { DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream); }; +class LocalFileOutputStream : public OutputStream { + public: + explicit LocalFileOutputStream(const std::string& path); + + virtual ~LocalFileOutputStream(); + + // Close the output stream + void Close() override; + + // Return the current position in the output stream relative to the start + int64_t Tell() override; + + // Copy bytes into the output stream + void Write(const uint8_t* data, int64_t length) override; + + private: + void CloseFile(); + + FILE* file_; + bool is_open_; +}; + } // namespace parquet #endif // PARQUET_UTIL_OUTPUT_H